From f71cd2c6f34a9ae5ec0ce8d8e296515e80a4b00d Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Sat, 16 Feb 2019 16:41:03 -0800 Subject: [PATCH] Status cache runs out of space in the bloom filter (#2796) The cache is designed for 1m statuses, about 1 second worth of transactions at full capacity. Refresh the cache every 1 second worth of ticks. --- src/bank.rs | 19 ++++++++++------- src/status_cache.rs | 51 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/src/bank.rs b/src/bank.rs index 28d879420d..e83a94013e 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -7,6 +7,7 @@ use crate::accounts::{Accounts, ErrorCounters, InstructionAccounts, InstructionL use crate::counter::Counter; use crate::genesis_block::GenesisBlock; use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS}; +use crate::poh_service::NUM_TICKS_PER_SECOND; use crate::rpc_pubsub::RpcSubscriptions; use crate::status_cache::StatusCache; use bincode::deserialize; @@ -124,11 +125,9 @@ impl Bank { } pub fn copy_for_tpu(&self) -> Self { - let mut status_cache = BankStatusCache::default(); - status_cache.merge_into_root(self.status_cache.read().unwrap().clone()); Self { accounts: self.accounts.copy_for_tpu(), - status_cache: RwLock::new(status_cache), + status_cache: RwLock::new(self.status_cache.read().unwrap().clone()), last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().clone()), subscriptions: RwLock::new(None), } @@ -295,9 +294,16 @@ impl Bank { /// the oldest ones once its internal cache is full. Once boot, the /// bank will reject transactions using that `last_id`. pub fn register_tick(&self, last_id: &Hash) { - let mut last_id_queue = self.last_id_queue.write().unwrap(); - inc_new_counter_info!("bank-register_tick-registered", 1); - last_id_queue.register_tick(last_id); + let current_tick_height = { + //atomic register and read the tick + let mut last_id_queue = self.last_id_queue.write().unwrap(); + inc_new_counter_info!("bank-register_tick-registered", 1); + last_id_queue.register_tick(last_id); + last_id_queue.tick_height + }; + if current_tick_height % NUM_TICKS_PER_SECOND as u64 == 0 { + self.status_cache.write().unwrap().new_cache(last_id); + } } /// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method. @@ -637,7 +643,6 @@ mod tests { use solana_sdk::system_instruction::SystemInstruction; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::transaction::Instruction; - use std; #[test] fn test_bank_new() { diff --git a/src/status_cache.rs b/src/status_cache.rs index e1fa0e9a1c..49bc109b20 100644 --- a/src/status_cache.rs +++ b/src/status_cache.rs @@ -1,11 +1,15 @@ use crate::bloom::{Bloom, BloomHashIndex}; use crate::last_id_queue::MAX_ENTRY_IDS; +use crate::poh_service::NUM_TICKS_PER_SECOND; use hashbrown::HashMap; use solana_sdk::hash::Hash; use solana_sdk::signature::Signature; use std::collections::VecDeque; use std::ops::{Deref, DerefMut}; +/// This cache is designed to last 1 second +const MAX_CACHE_ENTRIES: usize = MAX_ENTRY_IDS / NUM_TICKS_PER_SECOND; + type FailureMap = HashMap; #[derive(Clone)] @@ -60,6 +64,7 @@ impl StatusCache { pub fn clear(&mut self) { self.failures.clear(); self.signatures.clear(); + self.merges = VecDeque::new(); } fn get_signature_status_merged(&self, sig: &Signature) -> Option> { for c in &self.merges { @@ -85,8 +90,19 @@ impl StatusCache { // which cannot be rolled back assert!(other.merges.is_empty()); self.merges.push_front(other); - if self.merges.len() > MAX_ENTRY_IDS { - //TODO check if this is the right size ^ + if self.merges.len() > MAX_CACHE_ENTRIES { + self.merges.pop_back(); + } + } + + /// Crate a new cache, pushing the old cache into the merged queue + pub fn new_cache(&mut self, last_id: &Hash) { + let mut old = Self::new(last_id); + std::mem::swap(&mut old.signatures, &mut self.signatures); + std::mem::swap(&mut old.failures, &mut self.failures); + assert!(old.merges.is_empty()); + self.merges.push_front(old); + if self.merges.len() > MAX_CACHE_ENTRIES { self.merges.pop_back(); } } @@ -162,6 +178,37 @@ mod tests { assert!(StatusCache::has_signature_all(&checkpoints, &sig)); } + #[test] + fn test_new_cache() { + let sig = Signature::default(); + let last_id = hash(Hash::default().as_ref()); + let mut first = BankStatusCache::new(&last_id); + first.add(&sig); + assert_eq!(first.get_signature_status(&sig), Some(Ok(()))); + let last_id = hash(last_id.as_ref()); + first.new_cache(&last_id); + assert_eq!(first.get_signature_status(&sig), Some(Ok(())),); + assert!(first.has_signature(&sig)); + first.clear(); + assert_eq!(first.get_signature_status(&sig), None); + assert!(!first.has_signature(&sig)); + } + + #[test] + fn test_new_cache_full() { + let sig = Signature::default(); + let last_id = hash(Hash::default().as_ref()); + let mut first = BankStatusCache::new(&last_id); + first.add(&sig); + assert_eq!(first.get_signature_status(&sig), Some(Ok(()))); + for _ in 0..(MAX_CACHE_ENTRIES + 1) { + let last_id = hash(last_id.as_ref()); + first.new_cache(&last_id); + } + assert_eq!(first.get_signature_status(&sig), None); + assert!(!first.has_signature(&sig)); + } + #[test] fn test_has_signature_merged1() { let sig = Signature::default();