From 2754ceec608db58a11f17cfc45ec59963b7a7c7f Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Thu, 31 Jan 2019 06:53:52 -0800 Subject: [PATCH] StatusDeque split into separate objects with their own root checkpoint strategy (#2613) Split up StatusDeque into different modules * LastIdQueue tracks last_ids * StatusCache keeps track of signature statuses * StatusCache stores success as a bit in a bloom filter * Overhead for 1m Ok transactions is 4mb in memory * Less concurrency between the objects, last_id and status_cache are read and written to at different points in the pipeline * Each object has its own strategy for merging into the root checkpoint --- benches/bank.rs | 2 +- benches/banking_stage.rs | 2 +- src/bank.rs | 139 ++++++++++-------- src/bloom.rs | 37 +---- src/crds_gossip_pull.rs | 2 +- src/last_id_queue.rs | 213 +++++++++++++++++++++++++++ src/lib.rs | 3 +- src/rpc.rs | 26 +--- src/rpc_pubsub.rs | 3 +- src/status_cache.rs | 238 ++++++++++++++++++++++++++++++ src/status_deque.rs | 307 --------------------------------------- tests/programs.rs | 6 +- 12 files changed, 545 insertions(+), 433 deletions(-) create mode 100644 src/last_id_queue.rs create mode 100644 src/status_cache.rs delete mode 100644 src/status_deque.rs diff --git a/benches/bank.rs b/benches/bank.rs index 84934f1f91..5a068dc312 100644 --- a/benches/bank.rs +++ b/benches/bank.rs @@ -4,7 +4,7 @@ extern crate test; use solana::bank::*; use solana::genesis_block::GenesisBlock; -use solana::status_deque::MAX_ENTRY_IDS; +use solana::last_id_queue::MAX_ENTRY_IDS; use solana_sdk::hash::hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index 32f60a9906..b93f0f4b5d 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -8,8 +8,8 @@ use solana::bank::Bank; use solana::banking_stage::BankingStage; use solana::entry::Entry; use solana::genesis_block::GenesisBlock; +use solana::last_id_queue::MAX_ENTRY_IDS; use solana::packet::to_packets_chunked; -use solana::status_deque::MAX_ENTRY_IDS; use solana_sdk::hash::hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; diff --git a/src/bank.rs b/src/bank.rs index f65e5e90c5..a7e329b075 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -8,10 +8,11 @@ use crate::counter::Counter; use crate::entry::Entry; use crate::entry::EntrySlice; use crate::genesis_block::GenesisBlock; +use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS}; use crate::leader_scheduler::LeaderScheduler; use crate::poh_recorder::PohRecorder; use crate::runtime::{self, RuntimeError}; -use crate::status_deque::{Status, StatusDeque, StatusDequeError, MAX_ENTRY_IDS}; +use crate::status_cache::StatusCache; use bincode::deserialize; use itertools::Itertools; use log::Level; @@ -96,12 +97,17 @@ impl BankSubscriptions for LocalSubscriptions { fn check_signature(&self, _signature: &Signature, _status: &Result<()>) {} } +type BankStatusCache = StatusCache; + /// Manager for the state of all accounts and programs after processing its entries. pub struct Bank { pub accounts: Accounts, + /// A cache of signature statuses + status_cache: RwLock, + /// FIFO queue of `last_id` items - last_ids: RwLock>>, + last_id_queue: RwLock, // The latest confirmation time for the network confirmation_time: AtomicUsize, @@ -117,7 +123,8 @@ impl Default for Bank { fn default() -> Self { Bank { accounts: Accounts::default(), - last_ids: RwLock::new(StatusDeque::default()), + last_id_queue: RwLock::new(LastIdQueue::default()), + status_cache: RwLock::new(BankStatusCache::default()), confirmation_time: AtomicUsize::new(std::usize::MAX), leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())), subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))), @@ -138,9 +145,12 @@ impl Bank { } pub fn copy_for_tpu(&self) -> Self { + let mut status_cache = BankStatusCache::default(); + status_cache.merge_into_root(self.status_cache.read().unwrap().clone()); Self { accounts: self.accounts.copy_for_tpu(), - last_ids: RwLock::new(self.last_ids.read().unwrap().clone()), + status_cache: RwLock::new(status_cache), + last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().clone()), confirmation_time: AtomicUsize::new(self.confirmation_time()), leader_scheduler: self.leader_scheduler.clone(), subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))), @@ -255,7 +265,7 @@ impl Bank { /// Return the last entry ID registered. pub fn last_id(&self) -> Hash { - self.last_ids + self.last_id_queue .read() .unwrap() .last_id @@ -291,19 +301,32 @@ impl Bank { /// Forget all signatures. Useful for benchmarking. pub fn clear_signatures(&self) { - self.last_ids.write().unwrap().clear_signatures(); + self.status_cache.write().unwrap().clear(); } - fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { - let mut last_ids = self.last_ids.write().unwrap(); + fn update_subscriptions(&self, txs: &[Transaction], res: &[Result<()>]) { for (i, tx) in txs.iter().enumerate() { - last_ids.update_signature_status_with_last_id(&tx.signatures[0], &res[i], &tx.last_id); self.subscriptions .read() .unwrap() .check_signature(&tx.signatures[0], &res[i]); } } + fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { + let mut status_cache = self.status_cache.write().unwrap(); + for (i, tx) in txs.iter().enumerate() { + match &res[i] { + Ok(_) => status_cache.add(&tx.signatures[0]), + Err(BankError::LastIdNotFound) => (), + Err(BankError::DuplicateSignature) => (), + Err(BankError::AccountNotFound) => (), + Err(e) => { + status_cache.add(&tx.signatures[0]); + status_cache.save_failure_status(&tx.signatures[0], e.clone()); + } + } + } + } /// Looks through a list of tick heights and stakes, and finds the latest /// tick that has achieved confirmation @@ -312,7 +335,7 @@ impl Bank { ticks_and_stakes: &mut [(u64, u64)], supermajority_stake: u64, ) -> Option { - let last_ids = self.last_ids.read().unwrap(); + let last_ids = self.last_id_queue.read().unwrap(); last_ids.get_confirmation_timestamp(ticks_and_stakes, supermajority_stake) } @@ -321,9 +344,9 @@ impl Bank { /// the oldest ones once its internal cache is full. Once boot, the /// bank will reject transactions using that `last_id`. pub fn register_tick(&self, last_id: &Hash) { - let mut last_ids = self.last_ids.write().unwrap(); + let mut last_id_queue = self.last_id_queue.write().unwrap(); inc_new_counter_info!("bank-register_tick-registered", 1); - last_ids.register_tick(last_id) + last_id_queue.register_tick(last_id) } /// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method. @@ -436,33 +459,39 @@ impl Bank { ) -> Vec> { Accounts::load_accounts(&[&self.accounts], txs, results, error_counters) } - fn check_signatures( + fn check_age( &self, txs: &[Transaction], lock_results: Vec>, max_age: usize, error_counters: &mut ErrorCounters, ) -> Vec> { - let mut last_ids = self.last_ids.write().unwrap(); + let last_ids = self.last_id_queue.read().unwrap(); txs.iter() .zip(lock_results.into_iter()) .map(|(tx, lock_res)| { - if lock_res.is_ok() { - let r = if !last_ids.check_entry_id_age(tx.last_id, max_age) { - Err(StatusDequeError::LastIdNotFound) - } else { - last_ids.reserve_signature_with_last_id(&tx.last_id, &tx.signatures[0]) - }; - r.map_err(|err| match err { - StatusDequeError::LastIdNotFound => { - error_counters.reserve_last_id += 1; - BankError::LastIdNotFound - } - StatusDequeError::DuplicateSignature => { - error_counters.duplicate_signature += 1; - BankError::DuplicateSignature - } - }) + if lock_res.is_ok() && !last_ids.check_entry_id_age(tx.last_id, max_age) { + error_counters.reserve_last_id += 1; + Err(BankError::LastIdNotFound) + } else { + lock_res + } + }) + .collect() + } + fn check_signatures( + &self, + txs: &[Transaction], + lock_results: Vec>, + error_counters: &mut ErrorCounters, + ) -> Vec> { + let status_cache = self.status_cache.read().unwrap(); + txs.iter() + .zip(lock_results.into_iter()) + .map(|(tx, lock_res)| { + if lock_res.is_ok() && status_cache.has_signature(&tx.signatures[0]) { + error_counters.duplicate_signature += 1; + Err(BankError::DuplicateSignature) } else { lock_res } @@ -482,7 +511,8 @@ impl Bank { debug!("processing transactions: {}", txs.len()); let mut error_counters = ErrorCounters::default(); let now = Instant::now(); - let sig_results = self.check_signatures(txs, lock_results, max_age, &mut error_counters); + let age_results = self.check_age(txs, lock_results, max_age, &mut error_counters); + let sig_results = self.check_signatures(txs, age_results, &mut error_counters); let mut loaded_accounts = self.load_accounts(txs, sig_results, &mut error_counters); let tick_height = self.tick_height(); @@ -583,6 +613,7 @@ impl Bank { txs.len(), ); self.update_transaction_statuses(txs, &executed); + self.update_subscriptions(txs, &executed); } /// Process a batch of transactions. @@ -736,7 +767,8 @@ impl Bank { { let mut entry_height = 0; let mut last_id = genesis_block.last_id(); - self.last_ids = RwLock::new(StatusDeque::default()); + self.last_id_queue = RwLock::new(LastIdQueue::default()); + self.status_cache = RwLock::new(BankStatusCache::default()); // Ledger verification needs to be parallelized, but we can't pull the whole // thing into memory. We therefore chunk it. @@ -795,26 +827,15 @@ impl Bank { self.accounts.transaction_count() } - pub fn get_signature_status(&self, signature: &Signature) -> Option>> { - self.last_ids + pub fn get_signature_status(&self, signature: &Signature) -> Option> { + self.status_cache .read() .unwrap() .get_signature_status(signature) } pub fn has_signature(&self, signature: &Signature) -> bool { - self.last_ids.read().unwrap().has_signature(signature) - } - - pub fn get_signature( - &self, - last_id: &Hash, - signature: &Signature, - ) -> Option>> { - self.last_ids - .read() - .unwrap() - .get_signature(last_id, signature) + self.status_cache.read().unwrap().has_signature(signature) } /// Hash the `accounts` HashMap. This represents a validator's interpretation @@ -862,7 +883,7 @@ impl Bank { } pub fn tick_height(&self) -> u64 { - self.last_ids.read().unwrap().tick_height + self.last_id_queue.read().unwrap().tick_height } } @@ -936,14 +957,11 @@ mod tests { assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0); assert_eq!(bank.get_balance(&key1), 1); assert_eq!(bank.get_balance(&key2), 0); - assert_eq!( - bank.get_signature(&t1.last_id, &t1.signatures[0]), - Some(Status::Complete(Ok(()))) - ); + assert_eq!(bank.get_signature_status(&t1.signatures[0]), Some(Ok(()))); // TODO: Transactions that fail to pay a fee could be dropped silently assert_eq!( - bank.get_signature(&t2.last_id, &t2.signatures[0]), - Some(Status::Complete(Err(BankError::AccountInUse))) + bank.get_signature_status(&t2.signatures[0]), + Some(Err(BankError::AccountInUse)) ); } @@ -988,11 +1006,11 @@ mod tests { assert_eq!(bank.get_balance(&key1), 0); assert_eq!(bank.get_balance(&key2), 0); assert_eq!( - bank.get_signature(&t1.last_id, &t1.signatures[0]), - Some(Status::Complete(Err(BankError::ProgramError( + bank.get_signature_status(&t1.signatures[0]), + Some(Err(BankError::ProgramError( 1, ProgramError::ResultWithNegativeTokens - )))) + ))) ); } @@ -1014,10 +1032,7 @@ mod tests { assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0); assert_eq!(bank.get_balance(&key1), 1); assert_eq!(bank.get_balance(&key2), 1); - assert_eq!( - bank.get_signature(&t1.last_id, &t1.signatures[0]), - Some(Status::Complete(Ok(()))) - ); + assert_eq!(bank.get_signature_status(&t1.signatures[0]), Some(Ok(()))); } // TODO: This test demonstrates that fees are not paid when a program fails. @@ -1047,10 +1062,10 @@ mod tests { assert!(bank.has_signature(&signature)); assert_matches!( bank.get_signature_status(&signature), - Some(Status::Complete(Err(BankError::ProgramError( + Some(Err(BankError::ProgramError( 0, ProgramError::ResultWithNegativeTokens - )))) + ))) ); // The tokens didn't move, but the from address paid the transaction fee. diff --git a/src/bloom.rs b/src/bloom.rs index f81f0730f2..c6b943b6bd 100644 --- a/src/bloom.rs +++ b/src/bloom.rs @@ -45,7 +45,7 @@ impl Bloom { key.hash_at_index(k) % self.bits.len() } pub fn clear(&mut self) { - self.bits.clear(); + self.bits = BitVec::new_fill(false, self.bits.len()); } pub fn add(&mut self, key: &T) { for k in &self.keys { @@ -53,7 +53,7 @@ impl Bloom { self.bits.set(pos, true); } } - pub fn contains(&mut self, key: &T) -> bool { + pub fn contains(&self, key: &T) -> bool { for k in &self.keys { let pos = self.pos(key, *k); if !self.bits.get(pos) { @@ -64,30 +64,6 @@ impl Bloom { } } -//fn to_slice(v: u64) -> [u8; 8] { -// [ -// v as u8, -// (v >> 8) as u8, -// (v >> 16) as u8, -// (v >> 24) as u8, -// (v >> 32) as u8, -// (v >> 40) as u8, -// (v >> 48) as u8, -// (v >> 56) as u8, -// ] -//} - -//fn from_slice(v: &[u8]) -> u64 { -// u64::from(v[0]) -// | u64::from(v[1]) << 8 -// | u64::from(v[2]) << 16 -// | u64::from(v[3]) << 24 -// | u64::from(v[4]) << 32 -// | u64::from(v[5]) << 40 -// | u64::from(v[6]) << 48 -// | u64::from(v[7]) << 56 -//} -// fn slice_hash(slice: &[u8], hash_index: u64) -> u64 { let mut hasher = FnvHasher::with_key(hash_index); hasher.write(slice); @@ -104,15 +80,6 @@ impl> BloomHashIndex for T { mod test { use super::*; use solana_sdk::hash::{hash, Hash}; - // #[test] - // fn test_slice() { - // assert_eq!(from_slice(&to_slice(10)), 10); - // assert_eq!(from_slice(&to_slice(0x7fff7fff)), 0x7fff7fff); - // assert_eq!( - // from_slice(&to_slice(0x7fff7fff7fff7fff)), - // 0x7fff7fff7fff7fff - // ); - // } #[test] fn test_bloom_filter() { diff --git a/src/crds_gossip_pull.rs b/src/crds_gossip_pull.rs index ee1e71a78c..918cc81a11 100644 --- a/src/crds_gossip_pull.rs +++ b/src/crds_gossip_pull.rs @@ -371,7 +371,7 @@ mod test { // there is a chance of a false positive with bloom filters // assert that purged value is still in the set // chance of 30 consecutive false positives is 0.1^30 - let mut filter = node.build_crds_filter(&node_crds); + let filter = node.build_crds_filter(&node_crds); assert!(filter.contains(&value_hash)); } diff --git a/src/last_id_queue.rs b/src/last_id_queue.rs new file mode 100644 index 0000000000..4a32062ba4 --- /dev/null +++ b/src/last_id_queue.rs @@ -0,0 +1,213 @@ +use crate::poh_service::NUM_TICKS_PER_SECOND; +use hashbrown::HashMap; +use solana_sdk::hash::Hash; +use solana_sdk::timing::timestamp; + +/// The number of most recent `last_id` values that the bank will track the signatures +/// of. Once the bank discards a `last_id`, it will reject any transactions that use +/// that `last_id` in a transaction. Lowering this value reduces memory consumption, +/// but requires clients to update its `last_id` more frequently. Raising the value +/// lengthens the time a client must wait to be certain a missing transaction will +/// not be processed by the network. +pub const MAX_ENTRY_IDS: usize = NUM_TICKS_PER_SECOND * 120; + +#[derive(Debug, PartialEq, Eq, Clone)] +struct LastIdEntry { + timestamp: u64, + tick_height: u64, +} + +/// Low memory overhead, so can be cloned for every checkpoint +#[derive(Clone)] +pub struct LastIdQueue { + /// updated whenever an id is registered, at each tick ;) + pub tick_height: u64, + + /// last tick to be registered + pub last_id: Option, + + entries: HashMap, +} +impl Default for LastIdQueue { + fn default() -> Self { + Self { + entries: HashMap::new(), + tick_height: 0, + last_id: None, + } + } +} + +impl LastIdQueue { + /// Check if the age of the entry_id is within the max_age + /// return false for any entries with an age equal to or above max_age + pub fn check_entry_id_age(&self, entry_id: Hash, max_age: usize) -> bool { + let entry = self.entries.get(&entry_id); + match entry { + Some(entry) => self.tick_height - entry.tick_height < max_age as u64, + _ => false, + } + } + /// check if entry is valid + pub fn check_entry(&self, entry_id: Hash) -> bool { + self.entries.get(&entry_id).is_some() + } + /// Tell the bank which Entry IDs exist on the ledger. This function + /// assumes subsequent calls correspond to later entries, and will boot + /// the oldest ones once its internal cache is full. Once boot, the + /// bank will reject transactions using that `last_id`. + pub fn register_tick(&mut self, last_id: &Hash) { + self.tick_height += 1; + let tick_height = self.tick_height; + + // this clean up can be deferred until sigs gets larger + // because we verify entry.nth every place we check for validity + if self.entries.len() >= MAX_ENTRY_IDS as usize { + self.entries + .retain(|_, entry| tick_height - entry.tick_height <= MAX_ENTRY_IDS as u64); + } + + self.entries.insert( + *last_id, + LastIdEntry { + tick_height, + timestamp: timestamp(), + }, + ); + + self.last_id = Some(*last_id); + } + + /// Looks through a list of tick heights and stakes, and finds the latest + /// tick that has achieved confirmation + pub fn get_confirmation_timestamp( + &self, + ticks_and_stakes: &mut [(u64, u64)], + supermajority_stake: u64, + ) -> Option { + // Sort by tick height + ticks_and_stakes.sort_by(|a, b| a.0.cmp(&b.0)); + let current_tick_height = self.tick_height; + let mut total = 0; + for (tick_height, stake) in ticks_and_stakes.iter() { + if ((current_tick_height - tick_height) as usize) < MAX_ENTRY_IDS { + total += stake; + if total > supermajority_stake { + return self.tick_height_to_timestamp(*tick_height); + } + } + } + None + } + + /// Maps a tick height to a timestamp + fn tick_height_to_timestamp(&self, tick_height: u64) -> Option { + for entry in self.entries.values() { + if entry.tick_height == tick_height { + return Some(entry.timestamp); + } + } + None + } + + /// Look through the last_ids and find all the valid ids + /// This is batched to avoid holding the lock for a significant amount of time + /// + /// Return a vec of tuple of (valid index, timestamp) + /// index is into the passed ids slice to avoid copying hashes + pub fn count_valid_ids(&self, ids: &[Hash]) -> Vec<(usize, u64)> { + let mut ret = Vec::new(); + for (i, id) in ids.iter().enumerate() { + if let Some(entry) = self.entries.get(id) { + if self.tick_height - entry.tick_height < MAX_ENTRY_IDS as u64 { + ret.push((i, entry.timestamp)); + } + } + } + ret + } + pub fn clear(&mut self) { + self.entries = HashMap::new(); + self.tick_height = 0; + self.last_id = None; + } + /// fork for LastIdQueue is a simple clone + pub fn fork(&self) -> Self { + Self { + entries: self.entries.clone(), + tick_height: self.tick_height, + last_id: self.last_id, + } + } + /// merge for entryq is a swap + pub fn merge_into_root(&mut self, other: Self) { + let (entries, tick_height, last_id) = { (other.entries, other.tick_height, other.last_id) }; + self.entries = entries; + self.tick_height = tick_height; + self.last_id = last_id; + } +} +#[cfg(test)] +mod tests { + use super::*; + use bincode::serialize; + use solana_sdk::hash::hash; + + #[test] + fn test_count_valid_ids() { + let first_id = Hash::default(); + let mut entry_queue = LastIdQueue::default(); + entry_queue.register_tick(&first_id); + let ids: Vec<_> = (0..MAX_ENTRY_IDS) + .map(|i| { + let last_id = hash(&serialize(&i).unwrap()); // Unique hash + entry_queue.register_tick(&last_id); + last_id + }) + .collect(); + assert_eq!(entry_queue.count_valid_ids(&[]).len(), 0); + assert_eq!(entry_queue.count_valid_ids(&[first_id]).len(), 0); + for (i, id) in entry_queue.count_valid_ids(&ids).iter().enumerate() { + assert_eq!(id.0, i); + } + } + + #[test] + fn test_register_tick() { + let last_id = Hash::default(); + let mut entry_queue = LastIdQueue::default(); + assert!(!entry_queue.check_entry(last_id)); + entry_queue.register_tick(&last_id); + assert!(entry_queue.check_entry(last_id)); + } + #[test] + fn test_reject_old_last_id() { + let last_id = Hash::default(); + let mut entry_queue = LastIdQueue::default(); + for i in 0..MAX_ENTRY_IDS { + let last_id = hash(&serialize(&i).unwrap()); // Unique hash + entry_queue.register_tick(&last_id); + } + // Assert we're no longer able to use the oldest entry ID. + assert!(!entry_queue.check_entry(last_id)); + } + #[test] + fn test_fork() { + let last_id = Hash::default(); + let mut first = LastIdQueue::default(); + assert!(!first.check_entry(last_id)); + first.register_tick(&last_id); + let second = first.fork(); + assert!(second.check_entry(last_id)); + } + #[test] + fn test_merge() { + let last_id = Hash::default(); + let mut first = LastIdQueue::default(); + assert!(!first.check_entry(last_id)); + let mut second = first.fork(); + second.register_tick(&last_id); + first.merge_into_root(second); + assert!(first.check_entry(last_id)); + } +} diff --git a/src/lib.rs b/src/lib.rs index 14693f074b..e6c101eaac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,6 +41,7 @@ pub mod fullnode; pub mod gen_keys; pub mod genesis_block; pub mod gossip_service; +pub mod last_id_queue; pub mod leader_scheduler; pub mod local_vote_signer_service; pub mod packet; @@ -60,7 +61,7 @@ pub mod runtime; pub mod service; pub mod sigverify; pub mod sigverify_stage; -pub mod status_deque; +pub mod status_cache; pub mod storage_stage; pub mod streamer; pub mod test_tx; diff --git a/src/rpc.rs b/src/rpc.rs index e575aa7b0c..213f47cab2 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -6,7 +6,6 @@ use crate::jsonrpc_core::*; use crate::jsonrpc_http_server::*; use crate::packet::PACKET_DATA_SIZE; use crate::service::Service; -use crate::status_deque::Status; use crate::storage_stage::StorageState; use bincode::{deserialize, serialize}; use bs58; @@ -230,22 +229,13 @@ impl RpcSol for RpcSolImpl { RpcSignatureStatus::SignatureNotFound } else { match res.unwrap() { - Status::Reserved => { - // Report SignatureReserved as SignatureNotFound as SignatureReserved is - // transitory while the bank processes the associated transaction. - RpcSignatureStatus::SignatureNotFound + Ok(_) => RpcSignatureStatus::Confirmed, + Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse, + Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError, + Err(err) => { + trace!("mapping {:?} to GenericFailure", err); + RpcSignatureStatus::GenericFailure } - Status::Complete(res) => match res { - Ok(_) => RpcSignatureStatus::Confirmed, - Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse, - Err(BankError::ProgramError(_, _)) => { - RpcSignatureStatus::ProgramRuntimeError - } - Err(err) => { - trace!("mapping {:?} to GenericFailure", err); - RpcSignatureStatus::GenericFailure - } - }, } } }; @@ -294,7 +284,7 @@ impl RpcSol for RpcSolImpl { .unwrap() .get_signature_status(signature); - if signature_status == Some(Status::Complete(Ok(()))) { + if signature_status == Some(Ok(())) { info!("airdrop signature ok"); return Ok(bs58::encode(signature).into_string()); } else if now.elapsed().as_secs() > 5 { @@ -388,7 +378,7 @@ impl JsonRpcRequestProcessor { let id = self.bank.last_id(); Ok(bs58::encode(id).into_string()) } - pub fn get_signature_status(&self, signature: Signature) -> Option>> { + pub fn get_signature_status(&self, signature: Signature) -> Option> { self.bank.get_signature_status(&signature) } fn get_transaction_count(&self) -> Result { diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index 5bc9a6e411..abc1256f4c 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -10,7 +10,6 @@ use crate::jsonrpc_pubsub::{PubSubHandler, Session, SubscriptionId}; use crate::jsonrpc_ws_server::{RequestContext, Sender, ServerBuilder}; use crate::rpc::RpcSignatureStatus; use crate::service::Service; -use crate::status_deque::Status; use bs58; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; @@ -322,7 +321,7 @@ impl RpcSolPubSubImpl { } match status.unwrap() { - Status::Complete(Ok(_)) => { + Ok(_) => { sink.notify(Ok(RpcSignatureStatus::Confirmed)) .wait() .unwrap(); diff --git a/src/status_cache.rs b/src/status_cache.rs new file mode 100644 index 0000000000..d182a6ffde --- /dev/null +++ b/src/status_cache.rs @@ -0,0 +1,238 @@ +use crate::bloom::{Bloom, BloomHashIndex}; +use crate::last_id_queue::MAX_ENTRY_IDS; +use hashbrown::HashMap; +use solana_sdk::hash::Hash; +use solana_sdk::signature::Signature; +use std::collections::VecDeque; +use std::ops::{Deref, DerefMut}; + +type FailureMap = HashMap; + +#[derive(Clone)] +pub struct StatusCache { + /// all signatures seen at this checkpoint + signatures: Bloom, + + /// failures + failures: FailureMap, + + /// Merges are empty unless this is the root checkpoint which cannot be unrolled + merges: VecDeque>, +} + +impl Default for StatusCache { + fn default() -> Self { + Self::new(&Hash::default()) + } +} + +impl StatusCache { + pub fn new(last_id: &Hash) -> Self { + let keys = (0..27).map(|i| last_id.hash_at_index(i)).collect(); + Self { + signatures: Bloom::new(38_340_234, keys), + failures: HashMap::new(), + merges: VecDeque::new(), + } + } + fn has_signature_merged(&self, sig: &Signature) -> bool { + for c in &self.merges { + if c.has_signature(sig) { + return true; + } + } + false + } + /// test if a signature is known + pub fn has_signature(&self, sig: &Signature) -> bool { + self.signatures.contains(&sig) || self.has_signature_merged(sig) + } + /// add a signature + pub fn add(&mut self, sig: &Signature) { + self.signatures.add(&sig) + } + /// Save an error status for a signature + pub fn save_failure_status(&mut self, sig: &Signature, err: T) { + assert!(self.has_signature(sig), "sig not found"); + self.failures.insert(*sig, err); + } + /// Forget all signatures. Useful for benchmarking. + pub fn clear(&mut self) { + self.failures.clear(); + self.signatures.clear(); + } + fn get_signature_status_merged(&self, sig: &Signature) -> Option> { + for c in &self.merges { + if c.has_signature(sig) { + return c.get_signature_status(sig); + } + } + None + } + pub fn get_signature_status(&self, sig: &Signature) -> Option> { + if let Some(res) = self.failures.get(sig) { + return Some(Err(res.clone())); + } else if self.signatures.contains(sig) { + return Some(Ok(())); + } + self.get_signature_status_merged(sig) + } + /// like accounts, status cache starts with an new data structure for every checkpoint + /// so only merge is implemented + /// but the merges maintains a history + pub fn merge_into_root(&mut self, other: Self) { + // merges should be empty for every other checkpoint accept the root + // which cannot be rolled back + assert!(other.merges.is_empty()); + self.merges.push_front(other); + if self.merges.len() > MAX_ENTRY_IDS { + //TODO check if this is the right size ^ + self.merges.pop_back(); + } + } + pub fn get_signature_status_all( + checkpoints: &[U], + signature: &Signature, + ) -> Option> + where + U: Deref, + { + for c in checkpoints { + if let Some(status) = c.get_signature_status(signature) { + return Some(status); + } + } + None + } + pub fn has_signature_all(checkpoints: &[U], signature: &Signature) -> bool + where + U: Deref, + { + for c in checkpoints { + if c.has_signature(signature) { + return true; + } + } + false + } + pub fn clear_all(checkpoints: &mut [U]) -> bool + where + U: DerefMut, + { + for c in checkpoints.iter_mut() { + c.clear(); + } + false + } +} +#[cfg(test)] +mod tests { + use super::*; + use crate::bank::BankError; + use solana_sdk::hash::hash; + + type BankStatusCache = StatusCache; + + #[test] + fn test_has_signature() { + let sig = Default::default(); + let last_id = hash(Hash::default().as_ref()); + let mut status_cache = BankStatusCache::new(&last_id); + assert_eq!(status_cache.has_signature(&sig), false); + assert_eq!(status_cache.get_signature_status(&sig), None,); + status_cache.add(&sig); + assert_eq!(status_cache.has_signature(&sig), true); + assert_eq!(status_cache.get_signature_status(&sig), Some(Ok(())),); + } + + #[test] + fn test_has_signature_checkpoint() { + let sig = Default::default(); + let last_id = hash(Hash::default().as_ref()); + let mut first = BankStatusCache::new(&last_id); + first.add(&sig); + assert_eq!(first.get_signature_status(&sig), Some(Ok(()))); + let last_id = hash(last_id.as_ref()); + let second = StatusCache::new(&last_id); + let checkpoints = [&second, &first]; + assert_eq!( + BankStatusCache::get_signature_status_all(&checkpoints, &sig), + Some(Ok(())), + ); + assert!(StatusCache::has_signature_all(&checkpoints, &sig)); + } + + #[test] + fn test_has_signature_merged1() { + let sig = Default::default(); + let last_id = hash(Hash::default().as_ref()); + let mut first = BankStatusCache::new(&last_id); + first.add(&sig); + assert_eq!(first.get_signature_status(&sig), Some(Ok(()))); + let last_id = hash(last_id.as_ref()); + let second = BankStatusCache::new(&last_id); + first.merge_into_root(second); + assert_eq!(first.get_signature_status(&sig), Some(Ok(())),); + assert!(first.has_signature(&sig)); + } + + #[test] + fn test_has_signature_merged2() { + let sig = Default::default(); + let last_id = hash(Hash::default().as_ref()); + let mut first = BankStatusCache::new(&last_id); + first.add(&sig); + assert_eq!(first.get_signature_status(&sig), Some(Ok(()))); + let last_id = hash(last_id.as_ref()); + let mut second = BankStatusCache::new(&last_id); + second.merge_into_root(first); + assert_eq!(second.get_signature_status(&sig), Some(Ok(())),); + assert!(second.has_signature(&sig)); + } + + #[test] + fn test_failure_status() { + let sig = Default::default(); + let last_id = hash(Hash::default().as_ref()); + let mut first = StatusCache::new(&last_id); + first.add(&sig); + first.save_failure_status(&sig, BankError::DuplicateSignature); + assert_eq!(first.has_signature(&sig), true); + assert_eq!( + first.get_signature_status(&sig), + Some(Err(BankError::DuplicateSignature)), + ); + } + + #[test] + fn test_clear_signatures() { + let sig = Default::default(); + let last_id = hash(Hash::default().as_ref()); + let mut first = StatusCache::new(&last_id); + first.add(&sig); + assert_eq!(first.has_signature(&sig), true); + first.save_failure_status(&sig, BankError::DuplicateSignature); + assert_eq!( + first.get_signature_status(&sig), + Some(Err(BankError::DuplicateSignature)), + ); + first.clear(); + assert_eq!(first.has_signature(&sig), false); + assert_eq!(first.get_signature_status(&sig), None,); + } + #[test] + fn test_clear_signatures_all() { + let sig = Default::default(); + let last_id = hash(Hash::default().as_ref()); + let mut first = StatusCache::new(&last_id); + first.add(&sig); + assert_eq!(first.has_signature(&sig), true); + let mut second = StatusCache::new(&last_id); + let mut checkpoints = [&mut second, &mut first]; + BankStatusCache::clear_all(&mut checkpoints); + assert_eq!( + BankStatusCache::has_signature_all(&checkpoints, &sig), + false + ); + } +} diff --git a/src/status_deque.rs b/src/status_deque.rs deleted file mode 100644 index 8335f71798..0000000000 --- a/src/status_deque.rs +++ /dev/null @@ -1,307 +0,0 @@ -use crate::poh_service::NUM_TICKS_PER_SECOND; -use hashbrown::HashMap; -use solana_sdk::hash::Hash; -use solana_sdk::signature::Signature; -use solana_sdk::timing::timestamp; -use std::result; - -/// The number of most recent `last_id` values that the bank will track the signatures -/// of. Once the bank discards a `last_id`, it will reject any transactions that use -/// that `last_id` in a transaction. Lowering this value reduces memory consumption, -/// but requires clients to update its `last_id` more frequently. Raising the value -/// lengthens the time a client must wait to be certain a missing transaction will -/// not be processed by the network. -pub const MAX_ENTRY_IDS: usize = NUM_TICKS_PER_SECOND * 120; - -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum Status { - Reserved, - Complete(T), -} - -type StatusMap = HashMap>; -type StatusEntryMap = HashMap>; - -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum StatusDequeError { - /// The `Signature` has been seen before. This can occur under normal operation - /// when a UDP packet is duplicated, as a user error from a client not updating - /// its `last_id`, or as a double-spend attack. - DuplicateSignature, - - /// The bank has not seen the given `last_id` or the transaction is too old and - /// the `last_id` has been discarded. - LastIdNotFound, -} - -pub type Result = result::Result; - -/// a record of a tick, from register_tick -#[derive(Clone)] -struct StatusEntry { - /// when the id was registered, according to network time - tick_height: u64, - - /// timestamp when this id was registered, used for stats/confirmation - timestamp: u64, - - /// a map of signature status, used for duplicate detection - statuses: StatusMap, -} - -#[derive(Clone)] -pub struct StatusDeque { - /// A FIFO queue of `last_id` items, where each item is a set of signatures - /// that have been processed using that `last_id`. Rejected `last_id` - /// values are so old that the `last_id` has been pulled out of the queue. - - /// updated whenever an id is registered, at each tick ;) - pub tick_height: u64, - - /// last tick to be registered - pub last_id: Option, - - /// Mapping of hashes to signature sets along with timestamp and what tick_height - /// was when the id was added. The bank uses this data to - /// reject transactions with signatures it's seen before and to reject - /// transactions that are too old (nth is too small) - entries: StatusEntryMap, -} - -impl Default for StatusDeque { - fn default() -> Self { - Self { - tick_height: 0, - last_id: None, - entries: HashMap::new(), - } - } -} - -impl StatusDeque { - pub fn update_signature_status_with_last_id( - &mut self, - signature: &Signature, - result: &T, - last_id: &Hash, - ) { - if let Some(entry) = self.entries.get_mut(last_id) { - entry - .statuses - .insert(*signature, Status::Complete(result.clone())); - } - } - pub fn reserve_signature_with_last_id( - &mut self, - last_id: &Hash, - sig: &Signature, - ) -> Result<()> { - if let Some(entry) = self.entries.get_mut(last_id) { - if self.tick_height - entry.tick_height < MAX_ENTRY_IDS as u64 { - return Self::reserve_signature(&mut entry.statuses, sig); - } - } - Err(StatusDequeError::LastIdNotFound) - } - - /// Store the given signature. The bank will reject any transaction with the same signature. - fn reserve_signature(statuses: &mut StatusMap, signature: &Signature) -> Result<()> { - if let Some(_result) = statuses.get(signature) { - return Err(StatusDequeError::DuplicateSignature); - } - statuses.insert(*signature, Status::Reserved); - Ok(()) - } - - /// Forget all signatures. Useful for benchmarking. - pub fn clear_signatures(&mut self) { - for entry in &mut self.entries.values_mut() { - entry.statuses.clear(); - } - } - - /// Check if the age of the entry_id is within the max_age - /// return false for any entries with an age equal to or above max_age - pub fn check_entry_id_age(&self, entry_id: Hash, max_age: usize) -> bool { - let entry = self.entries.get(&entry_id); - - match entry { - Some(entry) => self.tick_height - entry.tick_height < max_age as u64, - _ => false, - } - } - /// Tell the bank which Entry IDs exist on the ledger. This function - /// assumes subsequent calls correspond to later entries, and will boot - /// the oldest ones once its internal cache is full. Once boot, the - /// bank will reject transactions using that `last_id`. - pub fn register_tick(&mut self, last_id: &Hash) { - self.tick_height += 1; - let tick_height = self.tick_height; - - // this clean up can be deferred until sigs gets larger - // because we verify entry.nth every place we check for validity - if self.entries.len() >= MAX_ENTRY_IDS as usize { - self.entries - .retain(|_, entry| tick_height - entry.tick_height <= MAX_ENTRY_IDS as u64); - } - - self.entries.insert( - *last_id, - StatusEntry { - tick_height, - timestamp: timestamp(), - statuses: HashMap::new(), - }, - ); - - self.last_id = Some(*last_id); - } - - /// Looks through a list of tick heights and stakes, and finds the latest - /// tick that has achieved confirmation - pub fn get_confirmation_timestamp( - &self, - ticks_and_stakes: &mut [(u64, u64)], - supermajority_stake: u64, - ) -> Option { - // Sort by tick height - ticks_and_stakes.sort_by(|a, b| b.0.cmp(&a.0)); - let current_tick_height = self.tick_height; - let mut total = 0; - for (tick_height, stake) in ticks_and_stakes.iter() { - if current_tick_height > *tick_height - && ((current_tick_height - tick_height) as usize) < MAX_ENTRY_IDS - { - total += stake; - if total > supermajority_stake { - return self.tick_height_to_timestamp(*tick_height); - } - } - } - None - } - - /// Maps a tick height to a timestamp - fn tick_height_to_timestamp(&self, tick_height: u64) -> Option { - for entry in self.entries.values() { - if entry.tick_height == tick_height { - return Some(entry.timestamp); - } - } - None - } - - pub fn get_signature_status(&self, signature: &Signature) -> Option> { - for entry in self.entries.values() { - if let Some(res) = entry.statuses.get(signature) { - return Some(res.clone()); - } - } - None - } - pub fn has_signature(&self, signature: &Signature) -> bool { - self.get_signature_status(signature).is_some() - } - - pub fn get_signature(&self, last_id: &Hash, signature: &Signature) -> Option> { - self.entries - .get(last_id) - .and_then(|entry| entry.statuses.get(signature).cloned()) - } -} -#[cfg(test)] -mod tests { - use super::*; - use bincode::serialize; - use solana_sdk::hash::hash; - #[test] - fn test_duplicate_transaction_signature() { - let sig = Default::default(); - let last_id = Default::default(); - let mut status_deque: StatusDeque<()> = StatusDeque::default(); - status_deque.register_tick(&last_id); - assert_eq!( - status_deque.reserve_signature_with_last_id(&last_id, &sig), - Ok(()) - ); - assert_eq!( - status_deque.reserve_signature_with_last_id(&last_id, &sig), - Err(StatusDequeError::DuplicateSignature) - ); - } - - #[test] - fn test_clear_signatures() { - let signature = Signature::default(); - let last_id = Default::default(); - let mut status_deque: StatusDeque<()> = StatusDeque::default(); - status_deque.register_tick(&last_id); - status_deque - .reserve_signature_with_last_id(&last_id, &signature) - .unwrap(); - status_deque.clear_signatures(); - assert_eq!( - status_deque.reserve_signature_with_last_id(&last_id, &signature), - Ok(()) - ); - } - - #[test] - fn test_get_signature_status() { - let signature = Signature::default(); - let last_id = Default::default(); - let mut status_deque: StatusDeque<()> = StatusDeque::default(); - status_deque.register_tick(&last_id); - status_deque - .reserve_signature_with_last_id(&last_id, &signature) - .expect("reserve signature"); - assert_eq!( - status_deque.get_signature_status(&signature), - Some(Status::Reserved) - ); - } - - #[test] - fn test_register_tick() { - let signature = Signature::default(); - let last_id = Default::default(); - let mut status_deque: StatusDeque<()> = StatusDeque::default(); - assert_eq!( - status_deque.reserve_signature_with_last_id(&last_id, &signature), - Err(StatusDequeError::LastIdNotFound) - ); - status_deque.register_tick(&last_id); - assert_eq!( - status_deque.reserve_signature_with_last_id(&last_id, &signature), - Ok(()) - ); - } - - #[test] - fn test_has_signature() { - let signature = Signature::default(); - let last_id = Default::default(); - let mut status_deque: StatusDeque<()> = StatusDeque::default(); - status_deque.register_tick(&last_id); - status_deque - .reserve_signature_with_last_id(&last_id, &signature) - .expect("reserve signature"); - assert!(status_deque.has_signature(&signature)); - } - - #[test] - fn test_reject_old_last_id() { - let signature = Signature::default(); - let last_id = Default::default(); - let mut status_deque: StatusDeque<()> = StatusDeque::default(); - for i in 0..MAX_ENTRY_IDS { - let last_id = hash(&serialize(&i).unwrap()); // Unique hash - status_deque.register_tick(&last_id); - } - // Assert we're no longer able to use the oldest entry ID. - assert_eq!( - status_deque.reserve_signature_with_last_id(&last_id, &signature), - Err(StatusDequeError::LastIdNotFound) - ); - } -} diff --git a/tests/programs.rs b/tests/programs.rs index d886f09e5c..2525c90f41 100644 --- a/tests/programs.rs +++ b/tests/programs.rs @@ -3,7 +3,6 @@ use solana_native_loader; use solana::bank::Bank; use solana::genesis_block::GenesisBlock; -use solana::status_deque::Status; #[cfg(feature = "bpf_c")] use solana_sdk::bpf_loader; use solana_sdk::loader_transaction::LoaderTransaction; @@ -39,10 +38,7 @@ fn create_bpf_path(name: &str) -> PathBuf { fn check_tx_results(bank: &Bank, tx: &Transaction, result: Vec>) { assert_eq!(result.len(), 1); assert_eq!(result[0], Ok(())); - assert_eq!( - bank.get_signature(&tx.last_id, &tx.signatures[0]), - Some(Status::Complete(Ok(()))) - ); + assert_eq!(bank.get_signature_status(&tx.signatures[0]), Some(Ok(()))); } struct Loader {