Status cache runs out of space in the bloom filter (#2796)
The cache is designed for 1m statuses, about 1 second worth of transactions at full capacity. Refresh the cache every 1 second worth of ticks.
This commit is contained in:
parent
8ec1f6ea2e
commit
f71cd2c6f3
19
src/bank.rs
19
src/bank.rs
|
@ -7,6 +7,7 @@ use crate::accounts::{Accounts, ErrorCounters, InstructionAccounts, InstructionL
|
||||||
use crate::counter::Counter;
|
use crate::counter::Counter;
|
||||||
use crate::genesis_block::GenesisBlock;
|
use crate::genesis_block::GenesisBlock;
|
||||||
use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS};
|
use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS};
|
||||||
|
use crate::poh_service::NUM_TICKS_PER_SECOND;
|
||||||
use crate::rpc_pubsub::RpcSubscriptions;
|
use crate::rpc_pubsub::RpcSubscriptions;
|
||||||
use crate::status_cache::StatusCache;
|
use crate::status_cache::StatusCache;
|
||||||
use bincode::deserialize;
|
use bincode::deserialize;
|
||||||
|
@ -124,11 +125,9 @@ impl Bank {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn copy_for_tpu(&self) -> Self {
|
pub fn copy_for_tpu(&self) -> Self {
|
||||||
let mut status_cache = BankStatusCache::default();
|
|
||||||
status_cache.merge_into_root(self.status_cache.read().unwrap().clone());
|
|
||||||
Self {
|
Self {
|
||||||
accounts: self.accounts.copy_for_tpu(),
|
accounts: self.accounts.copy_for_tpu(),
|
||||||
status_cache: RwLock::new(status_cache),
|
status_cache: RwLock::new(self.status_cache.read().unwrap().clone()),
|
||||||
last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().clone()),
|
last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().clone()),
|
||||||
subscriptions: RwLock::new(None),
|
subscriptions: RwLock::new(None),
|
||||||
}
|
}
|
||||||
|
@ -295,9 +294,16 @@ impl Bank {
|
||||||
/// the oldest ones once its internal cache is full. Once boot, the
|
/// the oldest ones once its internal cache is full. Once boot, the
|
||||||
/// bank will reject transactions using that `last_id`.
|
/// bank will reject transactions using that `last_id`.
|
||||||
pub fn register_tick(&self, last_id: &Hash) {
|
pub fn register_tick(&self, last_id: &Hash) {
|
||||||
let mut last_id_queue = self.last_id_queue.write().unwrap();
|
let current_tick_height = {
|
||||||
inc_new_counter_info!("bank-register_tick-registered", 1);
|
//atomic register and read the tick
|
||||||
last_id_queue.register_tick(last_id);
|
let mut last_id_queue = self.last_id_queue.write().unwrap();
|
||||||
|
inc_new_counter_info!("bank-register_tick-registered", 1);
|
||||||
|
last_id_queue.register_tick(last_id);
|
||||||
|
last_id_queue.tick_height
|
||||||
|
};
|
||||||
|
if current_tick_height % NUM_TICKS_PER_SECOND as u64 == 0 {
|
||||||
|
self.status_cache.write().unwrap().new_cache(last_id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method.
|
/// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method.
|
||||||
|
@ -637,7 +643,6 @@ mod tests {
|
||||||
use solana_sdk::system_instruction::SystemInstruction;
|
use solana_sdk::system_instruction::SystemInstruction;
|
||||||
use solana_sdk::system_transaction::SystemTransaction;
|
use solana_sdk::system_transaction::SystemTransaction;
|
||||||
use solana_sdk::transaction::Instruction;
|
use solana_sdk::transaction::Instruction;
|
||||||
use std;
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_bank_new() {
|
fn test_bank_new() {
|
||||||
|
|
|
@ -1,11 +1,15 @@
|
||||||
use crate::bloom::{Bloom, BloomHashIndex};
|
use crate::bloom::{Bloom, BloomHashIndex};
|
||||||
use crate::last_id_queue::MAX_ENTRY_IDS;
|
use crate::last_id_queue::MAX_ENTRY_IDS;
|
||||||
|
use crate::poh_service::NUM_TICKS_PER_SECOND;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::signature::Signature;
|
use solana_sdk::signature::Signature;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::ops::{Deref, DerefMut};
|
use std::ops::{Deref, DerefMut};
|
||||||
|
|
||||||
|
/// This cache is designed to last 1 second
|
||||||
|
const MAX_CACHE_ENTRIES: usize = MAX_ENTRY_IDS / NUM_TICKS_PER_SECOND;
|
||||||
|
|
||||||
type FailureMap<T> = HashMap<Signature, T>;
|
type FailureMap<T> = HashMap<Signature, T>;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -60,6 +64,7 @@ impl<T: Clone> StatusCache<T> {
|
||||||
pub fn clear(&mut self) {
|
pub fn clear(&mut self) {
|
||||||
self.failures.clear();
|
self.failures.clear();
|
||||||
self.signatures.clear();
|
self.signatures.clear();
|
||||||
|
self.merges = VecDeque::new();
|
||||||
}
|
}
|
||||||
fn get_signature_status_merged(&self, sig: &Signature) -> Option<Result<(), T>> {
|
fn get_signature_status_merged(&self, sig: &Signature) -> Option<Result<(), T>> {
|
||||||
for c in &self.merges {
|
for c in &self.merges {
|
||||||
|
@ -85,8 +90,19 @@ impl<T: Clone> StatusCache<T> {
|
||||||
// which cannot be rolled back
|
// which cannot be rolled back
|
||||||
assert!(other.merges.is_empty());
|
assert!(other.merges.is_empty());
|
||||||
self.merges.push_front(other);
|
self.merges.push_front(other);
|
||||||
if self.merges.len() > MAX_ENTRY_IDS {
|
if self.merges.len() > MAX_CACHE_ENTRIES {
|
||||||
//TODO check if this is the right size ^
|
self.merges.pop_back();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Crate a new cache, pushing the old cache into the merged queue
|
||||||
|
pub fn new_cache(&mut self, last_id: &Hash) {
|
||||||
|
let mut old = Self::new(last_id);
|
||||||
|
std::mem::swap(&mut old.signatures, &mut self.signatures);
|
||||||
|
std::mem::swap(&mut old.failures, &mut self.failures);
|
||||||
|
assert!(old.merges.is_empty());
|
||||||
|
self.merges.push_front(old);
|
||||||
|
if self.merges.len() > MAX_CACHE_ENTRIES {
|
||||||
self.merges.pop_back();
|
self.merges.pop_back();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -162,6 +178,37 @@ mod tests {
|
||||||
assert!(StatusCache::has_signature_all(&checkpoints, &sig));
|
assert!(StatusCache::has_signature_all(&checkpoints, &sig));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_cache() {
|
||||||
|
let sig = Signature::default();
|
||||||
|
let last_id = hash(Hash::default().as_ref());
|
||||||
|
let mut first = BankStatusCache::new(&last_id);
|
||||||
|
first.add(&sig);
|
||||||
|
assert_eq!(first.get_signature_status(&sig), Some(Ok(())));
|
||||||
|
let last_id = hash(last_id.as_ref());
|
||||||
|
first.new_cache(&last_id);
|
||||||
|
assert_eq!(first.get_signature_status(&sig), Some(Ok(())),);
|
||||||
|
assert!(first.has_signature(&sig));
|
||||||
|
first.clear();
|
||||||
|
assert_eq!(first.get_signature_status(&sig), None);
|
||||||
|
assert!(!first.has_signature(&sig));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_cache_full() {
|
||||||
|
let sig = Signature::default();
|
||||||
|
let last_id = hash(Hash::default().as_ref());
|
||||||
|
let mut first = BankStatusCache::new(&last_id);
|
||||||
|
first.add(&sig);
|
||||||
|
assert_eq!(first.get_signature_status(&sig), Some(Ok(())));
|
||||||
|
for _ in 0..(MAX_CACHE_ENTRIES + 1) {
|
||||||
|
let last_id = hash(last_id.as_ref());
|
||||||
|
first.new_cache(&last_id);
|
||||||
|
}
|
||||||
|
assert_eq!(first.get_signature_status(&sig), None);
|
||||||
|
assert!(!first.has_signature(&sig));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_has_signature_merged1() {
|
fn test_has_signature_merged1() {
|
||||||
let sig = Signature::default();
|
let sig = Signature::default();
|
||||||
|
|
Loading…
Reference in New Issue