StatusDeque split into separate objects with their own root checkpoint strategy (#2613)
Split up StatusDeque into different modules * LastIdQueue tracks last_ids * StatusCache keeps track of signature statuses * StatusCache stores success as a bit in a bloom filter * Overhead for 1m Ok transactions is 4mb in memory * Less concurrency between the objects, last_id and status_cache are read and written to at different points in the pipeline * Each object has its own strategy for merging into the root checkpoint
This commit is contained in:
parent
609e915169
commit
2754ceec60
|
@ -4,7 +4,7 @@ extern crate test;
|
||||||
|
|
||||||
use solana::bank::*;
|
use solana::bank::*;
|
||||||
use solana::genesis_block::GenesisBlock;
|
use solana::genesis_block::GenesisBlock;
|
||||||
use solana::status_deque::MAX_ENTRY_IDS;
|
use solana::last_id_queue::MAX_ENTRY_IDS;
|
||||||
use solana_sdk::hash::hash;
|
use solana_sdk::hash::hash;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
use solana_sdk::system_transaction::SystemTransaction;
|
use solana_sdk::system_transaction::SystemTransaction;
|
||||||
|
|
|
@ -8,8 +8,8 @@ use solana::bank::Bank;
|
||||||
use solana::banking_stage::BankingStage;
|
use solana::banking_stage::BankingStage;
|
||||||
use solana::entry::Entry;
|
use solana::entry::Entry;
|
||||||
use solana::genesis_block::GenesisBlock;
|
use solana::genesis_block::GenesisBlock;
|
||||||
|
use solana::last_id_queue::MAX_ENTRY_IDS;
|
||||||
use solana::packet::to_packets_chunked;
|
use solana::packet::to_packets_chunked;
|
||||||
use solana::status_deque::MAX_ENTRY_IDS;
|
|
||||||
use solana_sdk::hash::hash;
|
use solana_sdk::hash::hash;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
|
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
|
||||||
|
|
139
src/bank.rs
139
src/bank.rs
|
@ -8,10 +8,11 @@ use crate::counter::Counter;
|
||||||
use crate::entry::Entry;
|
use crate::entry::Entry;
|
||||||
use crate::entry::EntrySlice;
|
use crate::entry::EntrySlice;
|
||||||
use crate::genesis_block::GenesisBlock;
|
use crate::genesis_block::GenesisBlock;
|
||||||
|
use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS};
|
||||||
use crate::leader_scheduler::LeaderScheduler;
|
use crate::leader_scheduler::LeaderScheduler;
|
||||||
use crate::poh_recorder::PohRecorder;
|
use crate::poh_recorder::PohRecorder;
|
||||||
use crate::runtime::{self, RuntimeError};
|
use crate::runtime::{self, RuntimeError};
|
||||||
use crate::status_deque::{Status, StatusDeque, StatusDequeError, MAX_ENTRY_IDS};
|
use crate::status_cache::StatusCache;
|
||||||
use bincode::deserialize;
|
use bincode::deserialize;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use log::Level;
|
use log::Level;
|
||||||
|
@ -96,12 +97,17 @@ impl BankSubscriptions for LocalSubscriptions {
|
||||||
fn check_signature(&self, _signature: &Signature, _status: &Result<()>) {}
|
fn check_signature(&self, _signature: &Signature, _status: &Result<()>) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type BankStatusCache = StatusCache<BankError>;
|
||||||
|
|
||||||
/// Manager for the state of all accounts and programs after processing its entries.
|
/// Manager for the state of all accounts and programs after processing its entries.
|
||||||
pub struct Bank {
|
pub struct Bank {
|
||||||
pub accounts: Accounts,
|
pub accounts: Accounts,
|
||||||
|
|
||||||
|
/// A cache of signature statuses
|
||||||
|
status_cache: RwLock<BankStatusCache>,
|
||||||
|
|
||||||
/// FIFO queue of `last_id` items
|
/// FIFO queue of `last_id` items
|
||||||
last_ids: RwLock<StatusDeque<Result<()>>>,
|
last_id_queue: RwLock<LastIdQueue>,
|
||||||
|
|
||||||
// The latest confirmation time for the network
|
// The latest confirmation time for the network
|
||||||
confirmation_time: AtomicUsize,
|
confirmation_time: AtomicUsize,
|
||||||
|
@ -117,7 +123,8 @@ impl Default for Bank {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Bank {
|
Bank {
|
||||||
accounts: Accounts::default(),
|
accounts: Accounts::default(),
|
||||||
last_ids: RwLock::new(StatusDeque::default()),
|
last_id_queue: RwLock::new(LastIdQueue::default()),
|
||||||
|
status_cache: RwLock::new(BankStatusCache::default()),
|
||||||
confirmation_time: AtomicUsize::new(std::usize::MAX),
|
confirmation_time: AtomicUsize::new(std::usize::MAX),
|
||||||
leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())),
|
leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())),
|
||||||
subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))),
|
subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))),
|
||||||
|
@ -138,9 +145,12 @@ impl Bank {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn copy_for_tpu(&self) -> Self {
|
pub fn copy_for_tpu(&self) -> Self {
|
||||||
|
let mut status_cache = BankStatusCache::default();
|
||||||
|
status_cache.merge_into_root(self.status_cache.read().unwrap().clone());
|
||||||
Self {
|
Self {
|
||||||
accounts: self.accounts.copy_for_tpu(),
|
accounts: self.accounts.copy_for_tpu(),
|
||||||
last_ids: RwLock::new(self.last_ids.read().unwrap().clone()),
|
status_cache: RwLock::new(status_cache),
|
||||||
|
last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().clone()),
|
||||||
confirmation_time: AtomicUsize::new(self.confirmation_time()),
|
confirmation_time: AtomicUsize::new(self.confirmation_time()),
|
||||||
leader_scheduler: self.leader_scheduler.clone(),
|
leader_scheduler: self.leader_scheduler.clone(),
|
||||||
subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))),
|
subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))),
|
||||||
|
@ -255,7 +265,7 @@ impl Bank {
|
||||||
|
|
||||||
/// Return the last entry ID registered.
|
/// Return the last entry ID registered.
|
||||||
pub fn last_id(&self) -> Hash {
|
pub fn last_id(&self) -> Hash {
|
||||||
self.last_ids
|
self.last_id_queue
|
||||||
.read()
|
.read()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.last_id
|
.last_id
|
||||||
|
@ -291,19 +301,32 @@ impl Bank {
|
||||||
|
|
||||||
/// Forget all signatures. Useful for benchmarking.
|
/// Forget all signatures. Useful for benchmarking.
|
||||||
pub fn clear_signatures(&self) {
|
pub fn clear_signatures(&self) {
|
||||||
self.last_ids.write().unwrap().clear_signatures();
|
self.status_cache.write().unwrap().clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) {
|
fn update_subscriptions(&self, txs: &[Transaction], res: &[Result<()>]) {
|
||||||
let mut last_ids = self.last_ids.write().unwrap();
|
|
||||||
for (i, tx) in txs.iter().enumerate() {
|
for (i, tx) in txs.iter().enumerate() {
|
||||||
last_ids.update_signature_status_with_last_id(&tx.signatures[0], &res[i], &tx.last_id);
|
|
||||||
self.subscriptions
|
self.subscriptions
|
||||||
.read()
|
.read()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.check_signature(&tx.signatures[0], &res[i]);
|
.check_signature(&tx.signatures[0], &res[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) {
|
||||||
|
let mut status_cache = self.status_cache.write().unwrap();
|
||||||
|
for (i, tx) in txs.iter().enumerate() {
|
||||||
|
match &res[i] {
|
||||||
|
Ok(_) => status_cache.add(&tx.signatures[0]),
|
||||||
|
Err(BankError::LastIdNotFound) => (),
|
||||||
|
Err(BankError::DuplicateSignature) => (),
|
||||||
|
Err(BankError::AccountNotFound) => (),
|
||||||
|
Err(e) => {
|
||||||
|
status_cache.add(&tx.signatures[0]);
|
||||||
|
status_cache.save_failure_status(&tx.signatures[0], e.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Looks through a list of tick heights and stakes, and finds the latest
|
/// Looks through a list of tick heights and stakes, and finds the latest
|
||||||
/// tick that has achieved confirmation
|
/// tick that has achieved confirmation
|
||||||
|
@ -312,7 +335,7 @@ impl Bank {
|
||||||
ticks_and_stakes: &mut [(u64, u64)],
|
ticks_and_stakes: &mut [(u64, u64)],
|
||||||
supermajority_stake: u64,
|
supermajority_stake: u64,
|
||||||
) -> Option<u64> {
|
) -> Option<u64> {
|
||||||
let last_ids = self.last_ids.read().unwrap();
|
let last_ids = self.last_id_queue.read().unwrap();
|
||||||
last_ids.get_confirmation_timestamp(ticks_and_stakes, supermajority_stake)
|
last_ids.get_confirmation_timestamp(ticks_and_stakes, supermajority_stake)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -321,9 +344,9 @@ impl Bank {
|
||||||
/// the oldest ones once its internal cache is full. Once boot, the
|
/// the oldest ones once its internal cache is full. Once boot, the
|
||||||
/// bank will reject transactions using that `last_id`.
|
/// bank will reject transactions using that `last_id`.
|
||||||
pub fn register_tick(&self, last_id: &Hash) {
|
pub fn register_tick(&self, last_id: &Hash) {
|
||||||
let mut last_ids = self.last_ids.write().unwrap();
|
let mut last_id_queue = self.last_id_queue.write().unwrap();
|
||||||
inc_new_counter_info!("bank-register_tick-registered", 1);
|
inc_new_counter_info!("bank-register_tick-registered", 1);
|
||||||
last_ids.register_tick(last_id)
|
last_id_queue.register_tick(last_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method.
|
/// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method.
|
||||||
|
@ -436,33 +459,39 @@ impl Bank {
|
||||||
) -> Vec<Result<(InstructionAccounts, InstructionLoaders)>> {
|
) -> Vec<Result<(InstructionAccounts, InstructionLoaders)>> {
|
||||||
Accounts::load_accounts(&[&self.accounts], txs, results, error_counters)
|
Accounts::load_accounts(&[&self.accounts], txs, results, error_counters)
|
||||||
}
|
}
|
||||||
fn check_signatures(
|
fn check_age(
|
||||||
&self,
|
&self,
|
||||||
txs: &[Transaction],
|
txs: &[Transaction],
|
||||||
lock_results: Vec<Result<()>>,
|
lock_results: Vec<Result<()>>,
|
||||||
max_age: usize,
|
max_age: usize,
|
||||||
error_counters: &mut ErrorCounters,
|
error_counters: &mut ErrorCounters,
|
||||||
) -> Vec<Result<()>> {
|
) -> Vec<Result<()>> {
|
||||||
let mut last_ids = self.last_ids.write().unwrap();
|
let last_ids = self.last_id_queue.read().unwrap();
|
||||||
txs.iter()
|
txs.iter()
|
||||||
.zip(lock_results.into_iter())
|
.zip(lock_results.into_iter())
|
||||||
.map(|(tx, lock_res)| {
|
.map(|(tx, lock_res)| {
|
||||||
if lock_res.is_ok() {
|
if lock_res.is_ok() && !last_ids.check_entry_id_age(tx.last_id, max_age) {
|
||||||
let r = if !last_ids.check_entry_id_age(tx.last_id, max_age) {
|
error_counters.reserve_last_id += 1;
|
||||||
Err(StatusDequeError::LastIdNotFound)
|
Err(BankError::LastIdNotFound)
|
||||||
} else {
|
} else {
|
||||||
last_ids.reserve_signature_with_last_id(&tx.last_id, &tx.signatures[0])
|
lock_res
|
||||||
};
|
}
|
||||||
r.map_err(|err| match err {
|
})
|
||||||
StatusDequeError::LastIdNotFound => {
|
.collect()
|
||||||
error_counters.reserve_last_id += 1;
|
}
|
||||||
BankError::LastIdNotFound
|
fn check_signatures(
|
||||||
}
|
&self,
|
||||||
StatusDequeError::DuplicateSignature => {
|
txs: &[Transaction],
|
||||||
error_counters.duplicate_signature += 1;
|
lock_results: Vec<Result<()>>,
|
||||||
BankError::DuplicateSignature
|
error_counters: &mut ErrorCounters,
|
||||||
}
|
) -> Vec<Result<()>> {
|
||||||
})
|
let status_cache = self.status_cache.read().unwrap();
|
||||||
|
txs.iter()
|
||||||
|
.zip(lock_results.into_iter())
|
||||||
|
.map(|(tx, lock_res)| {
|
||||||
|
if lock_res.is_ok() && status_cache.has_signature(&tx.signatures[0]) {
|
||||||
|
error_counters.duplicate_signature += 1;
|
||||||
|
Err(BankError::DuplicateSignature)
|
||||||
} else {
|
} else {
|
||||||
lock_res
|
lock_res
|
||||||
}
|
}
|
||||||
|
@ -482,7 +511,8 @@ impl Bank {
|
||||||
debug!("processing transactions: {}", txs.len());
|
debug!("processing transactions: {}", txs.len());
|
||||||
let mut error_counters = ErrorCounters::default();
|
let mut error_counters = ErrorCounters::default();
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let sig_results = self.check_signatures(txs, lock_results, max_age, &mut error_counters);
|
let age_results = self.check_age(txs, lock_results, max_age, &mut error_counters);
|
||||||
|
let sig_results = self.check_signatures(txs, age_results, &mut error_counters);
|
||||||
let mut loaded_accounts = self.load_accounts(txs, sig_results, &mut error_counters);
|
let mut loaded_accounts = self.load_accounts(txs, sig_results, &mut error_counters);
|
||||||
let tick_height = self.tick_height();
|
let tick_height = self.tick_height();
|
||||||
|
|
||||||
|
@ -583,6 +613,7 @@ impl Bank {
|
||||||
txs.len(),
|
txs.len(),
|
||||||
);
|
);
|
||||||
self.update_transaction_statuses(txs, &executed);
|
self.update_transaction_statuses(txs, &executed);
|
||||||
|
self.update_subscriptions(txs, &executed);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process a batch of transactions.
|
/// Process a batch of transactions.
|
||||||
|
@ -736,7 +767,8 @@ impl Bank {
|
||||||
{
|
{
|
||||||
let mut entry_height = 0;
|
let mut entry_height = 0;
|
||||||
let mut last_id = genesis_block.last_id();
|
let mut last_id = genesis_block.last_id();
|
||||||
self.last_ids = RwLock::new(StatusDeque::default());
|
self.last_id_queue = RwLock::new(LastIdQueue::default());
|
||||||
|
self.status_cache = RwLock::new(BankStatusCache::default());
|
||||||
|
|
||||||
// Ledger verification needs to be parallelized, but we can't pull the whole
|
// Ledger verification needs to be parallelized, but we can't pull the whole
|
||||||
// thing into memory. We therefore chunk it.
|
// thing into memory. We therefore chunk it.
|
||||||
|
@ -795,26 +827,15 @@ impl Bank {
|
||||||
self.accounts.transaction_count()
|
self.accounts.transaction_count()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_signature_status(&self, signature: &Signature) -> Option<Status<Result<()>>> {
|
pub fn get_signature_status(&self, signature: &Signature) -> Option<Result<()>> {
|
||||||
self.last_ids
|
self.status_cache
|
||||||
.read()
|
.read()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.get_signature_status(signature)
|
.get_signature_status(signature)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn has_signature(&self, signature: &Signature) -> bool {
|
pub fn has_signature(&self, signature: &Signature) -> bool {
|
||||||
self.last_ids.read().unwrap().has_signature(signature)
|
self.status_cache.read().unwrap().has_signature(signature)
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_signature(
|
|
||||||
&self,
|
|
||||||
last_id: &Hash,
|
|
||||||
signature: &Signature,
|
|
||||||
) -> Option<Status<Result<()>>> {
|
|
||||||
self.last_ids
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.get_signature(last_id, signature)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Hash the `accounts` HashMap. This represents a validator's interpretation
|
/// Hash the `accounts` HashMap. This represents a validator's interpretation
|
||||||
|
@ -862,7 +883,7 @@ impl Bank {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn tick_height(&self) -> u64 {
|
pub fn tick_height(&self) -> u64 {
|
||||||
self.last_ids.read().unwrap().tick_height
|
self.last_id_queue.read().unwrap().tick_height
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -936,14 +957,11 @@ mod tests {
|
||||||
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0);
|
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0);
|
||||||
assert_eq!(bank.get_balance(&key1), 1);
|
assert_eq!(bank.get_balance(&key1), 1);
|
||||||
assert_eq!(bank.get_balance(&key2), 0);
|
assert_eq!(bank.get_balance(&key2), 0);
|
||||||
assert_eq!(
|
assert_eq!(bank.get_signature_status(&t1.signatures[0]), Some(Ok(())));
|
||||||
bank.get_signature(&t1.last_id, &t1.signatures[0]),
|
|
||||||
Some(Status::Complete(Ok(())))
|
|
||||||
);
|
|
||||||
// TODO: Transactions that fail to pay a fee could be dropped silently
|
// TODO: Transactions that fail to pay a fee could be dropped silently
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
bank.get_signature(&t2.last_id, &t2.signatures[0]),
|
bank.get_signature_status(&t2.signatures[0]),
|
||||||
Some(Status::Complete(Err(BankError::AccountInUse)))
|
Some(Err(BankError::AccountInUse))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -988,11 +1006,11 @@ mod tests {
|
||||||
assert_eq!(bank.get_balance(&key1), 0);
|
assert_eq!(bank.get_balance(&key1), 0);
|
||||||
assert_eq!(bank.get_balance(&key2), 0);
|
assert_eq!(bank.get_balance(&key2), 0);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
bank.get_signature(&t1.last_id, &t1.signatures[0]),
|
bank.get_signature_status(&t1.signatures[0]),
|
||||||
Some(Status::Complete(Err(BankError::ProgramError(
|
Some(Err(BankError::ProgramError(
|
||||||
1,
|
1,
|
||||||
ProgramError::ResultWithNegativeTokens
|
ProgramError::ResultWithNegativeTokens
|
||||||
))))
|
)))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1014,10 +1032,7 @@ mod tests {
|
||||||
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0);
|
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0);
|
||||||
assert_eq!(bank.get_balance(&key1), 1);
|
assert_eq!(bank.get_balance(&key1), 1);
|
||||||
assert_eq!(bank.get_balance(&key2), 1);
|
assert_eq!(bank.get_balance(&key2), 1);
|
||||||
assert_eq!(
|
assert_eq!(bank.get_signature_status(&t1.signatures[0]), Some(Ok(())));
|
||||||
bank.get_signature(&t1.last_id, &t1.signatures[0]),
|
|
||||||
Some(Status::Complete(Ok(())))
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: This test demonstrates that fees are not paid when a program fails.
|
// TODO: This test demonstrates that fees are not paid when a program fails.
|
||||||
|
@ -1047,10 +1062,10 @@ mod tests {
|
||||||
assert!(bank.has_signature(&signature));
|
assert!(bank.has_signature(&signature));
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
bank.get_signature_status(&signature),
|
bank.get_signature_status(&signature),
|
||||||
Some(Status::Complete(Err(BankError::ProgramError(
|
Some(Err(BankError::ProgramError(
|
||||||
0,
|
0,
|
||||||
ProgramError::ResultWithNegativeTokens
|
ProgramError::ResultWithNegativeTokens
|
||||||
))))
|
)))
|
||||||
);
|
);
|
||||||
|
|
||||||
// The tokens didn't move, but the from address paid the transaction fee.
|
// The tokens didn't move, but the from address paid the transaction fee.
|
||||||
|
|
37
src/bloom.rs
37
src/bloom.rs
|
@ -45,7 +45,7 @@ impl<T: BloomHashIndex> Bloom<T> {
|
||||||
key.hash_at_index(k) % self.bits.len()
|
key.hash_at_index(k) % self.bits.len()
|
||||||
}
|
}
|
||||||
pub fn clear(&mut self) {
|
pub fn clear(&mut self) {
|
||||||
self.bits.clear();
|
self.bits = BitVec::new_fill(false, self.bits.len());
|
||||||
}
|
}
|
||||||
pub fn add(&mut self, key: &T) {
|
pub fn add(&mut self, key: &T) {
|
||||||
for k in &self.keys {
|
for k in &self.keys {
|
||||||
|
@ -53,7 +53,7 @@ impl<T: BloomHashIndex> Bloom<T> {
|
||||||
self.bits.set(pos, true);
|
self.bits.set(pos, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn contains(&mut self, key: &T) -> bool {
|
pub fn contains(&self, key: &T) -> bool {
|
||||||
for k in &self.keys {
|
for k in &self.keys {
|
||||||
let pos = self.pos(key, *k);
|
let pos = self.pos(key, *k);
|
||||||
if !self.bits.get(pos) {
|
if !self.bits.get(pos) {
|
||||||
|
@ -64,30 +64,6 @@ impl<T: BloomHashIndex> Bloom<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//fn to_slice(v: u64) -> [u8; 8] {
|
|
||||||
// [
|
|
||||||
// v as u8,
|
|
||||||
// (v >> 8) as u8,
|
|
||||||
// (v >> 16) as u8,
|
|
||||||
// (v >> 24) as u8,
|
|
||||||
// (v >> 32) as u8,
|
|
||||||
// (v >> 40) as u8,
|
|
||||||
// (v >> 48) as u8,
|
|
||||||
// (v >> 56) as u8,
|
|
||||||
// ]
|
|
||||||
//}
|
|
||||||
|
|
||||||
//fn from_slice(v: &[u8]) -> u64 {
|
|
||||||
// u64::from(v[0])
|
|
||||||
// | u64::from(v[1]) << 8
|
|
||||||
// | u64::from(v[2]) << 16
|
|
||||||
// | u64::from(v[3]) << 24
|
|
||||||
// | u64::from(v[4]) << 32
|
|
||||||
// | u64::from(v[5]) << 40
|
|
||||||
// | u64::from(v[6]) << 48
|
|
||||||
// | u64::from(v[7]) << 56
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
fn slice_hash(slice: &[u8], hash_index: u64) -> u64 {
|
fn slice_hash(slice: &[u8], hash_index: u64) -> u64 {
|
||||||
let mut hasher = FnvHasher::with_key(hash_index);
|
let mut hasher = FnvHasher::with_key(hash_index);
|
||||||
hasher.write(slice);
|
hasher.write(slice);
|
||||||
|
@ -104,15 +80,6 @@ impl<T: AsRef<[u8]>> BloomHashIndex for T {
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
use solana_sdk::hash::{hash, Hash};
|
use solana_sdk::hash::{hash, Hash};
|
||||||
// #[test]
|
|
||||||
// fn test_slice() {
|
|
||||||
// assert_eq!(from_slice(&to_slice(10)), 10);
|
|
||||||
// assert_eq!(from_slice(&to_slice(0x7fff7fff)), 0x7fff7fff);
|
|
||||||
// assert_eq!(
|
|
||||||
// from_slice(&to_slice(0x7fff7fff7fff7fff)),
|
|
||||||
// 0x7fff7fff7fff7fff
|
|
||||||
// );
|
|
||||||
// }
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_bloom_filter() {
|
fn test_bloom_filter() {
|
||||||
|
|
|
@ -371,7 +371,7 @@ mod test {
|
||||||
// there is a chance of a false positive with bloom filters
|
// there is a chance of a false positive with bloom filters
|
||||||
// assert that purged value is still in the set
|
// assert that purged value is still in the set
|
||||||
// chance of 30 consecutive false positives is 0.1^30
|
// chance of 30 consecutive false positives is 0.1^30
|
||||||
let mut filter = node.build_crds_filter(&node_crds);
|
let filter = node.build_crds_filter(&node_crds);
|
||||||
assert!(filter.contains(&value_hash));
|
assert!(filter.contains(&value_hash));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,213 @@
|
||||||
|
use crate::poh_service::NUM_TICKS_PER_SECOND;
|
||||||
|
use hashbrown::HashMap;
|
||||||
|
use solana_sdk::hash::Hash;
|
||||||
|
use solana_sdk::timing::timestamp;
|
||||||
|
|
||||||
|
/// The number of most recent `last_id` values that the bank will track the signatures
|
||||||
|
/// of. Once the bank discards a `last_id`, it will reject any transactions that use
|
||||||
|
/// that `last_id` in a transaction. Lowering this value reduces memory consumption,
|
||||||
|
/// but requires clients to update its `last_id` more frequently. Raising the value
|
||||||
|
/// lengthens the time a client must wait to be certain a missing transaction will
|
||||||
|
/// not be processed by the network.
|
||||||
|
pub const MAX_ENTRY_IDS: usize = NUM_TICKS_PER_SECOND * 120;
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||||
|
struct LastIdEntry {
|
||||||
|
timestamp: u64,
|
||||||
|
tick_height: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Low memory overhead, so can be cloned for every checkpoint
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct LastIdQueue {
|
||||||
|
/// updated whenever an id is registered, at each tick ;)
|
||||||
|
pub tick_height: u64,
|
||||||
|
|
||||||
|
/// last tick to be registered
|
||||||
|
pub last_id: Option<Hash>,
|
||||||
|
|
||||||
|
entries: HashMap<Hash, LastIdEntry>,
|
||||||
|
}
|
||||||
|
impl Default for LastIdQueue {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
entries: HashMap::new(),
|
||||||
|
tick_height: 0,
|
||||||
|
last_id: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LastIdQueue {
|
||||||
|
/// Check if the age of the entry_id is within the max_age
|
||||||
|
/// return false for any entries with an age equal to or above max_age
|
||||||
|
pub fn check_entry_id_age(&self, entry_id: Hash, max_age: usize) -> bool {
|
||||||
|
let entry = self.entries.get(&entry_id);
|
||||||
|
match entry {
|
||||||
|
Some(entry) => self.tick_height - entry.tick_height < max_age as u64,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// check if entry is valid
|
||||||
|
pub fn check_entry(&self, entry_id: Hash) -> bool {
|
||||||
|
self.entries.get(&entry_id).is_some()
|
||||||
|
}
|
||||||
|
/// Tell the bank which Entry IDs exist on the ledger. This function
|
||||||
|
/// assumes subsequent calls correspond to later entries, and will boot
|
||||||
|
/// the oldest ones once its internal cache is full. Once boot, the
|
||||||
|
/// bank will reject transactions using that `last_id`.
|
||||||
|
pub fn register_tick(&mut self, last_id: &Hash) {
|
||||||
|
self.tick_height += 1;
|
||||||
|
let tick_height = self.tick_height;
|
||||||
|
|
||||||
|
// this clean up can be deferred until sigs gets larger
|
||||||
|
// because we verify entry.nth every place we check for validity
|
||||||
|
if self.entries.len() >= MAX_ENTRY_IDS as usize {
|
||||||
|
self.entries
|
||||||
|
.retain(|_, entry| tick_height - entry.tick_height <= MAX_ENTRY_IDS as u64);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.entries.insert(
|
||||||
|
*last_id,
|
||||||
|
LastIdEntry {
|
||||||
|
tick_height,
|
||||||
|
timestamp: timestamp(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
self.last_id = Some(*last_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Looks through a list of tick heights and stakes, and finds the latest
|
||||||
|
/// tick that has achieved confirmation
|
||||||
|
pub fn get_confirmation_timestamp(
|
||||||
|
&self,
|
||||||
|
ticks_and_stakes: &mut [(u64, u64)],
|
||||||
|
supermajority_stake: u64,
|
||||||
|
) -> Option<u64> {
|
||||||
|
// Sort by tick height
|
||||||
|
ticks_and_stakes.sort_by(|a, b| a.0.cmp(&b.0));
|
||||||
|
let current_tick_height = self.tick_height;
|
||||||
|
let mut total = 0;
|
||||||
|
for (tick_height, stake) in ticks_and_stakes.iter() {
|
||||||
|
if ((current_tick_height - tick_height) as usize) < MAX_ENTRY_IDS {
|
||||||
|
total += stake;
|
||||||
|
if total > supermajority_stake {
|
||||||
|
return self.tick_height_to_timestamp(*tick_height);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Maps a tick height to a timestamp
|
||||||
|
fn tick_height_to_timestamp(&self, tick_height: u64) -> Option<u64> {
|
||||||
|
for entry in self.entries.values() {
|
||||||
|
if entry.tick_height == tick_height {
|
||||||
|
return Some(entry.timestamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Look through the last_ids and find all the valid ids
|
||||||
|
/// This is batched to avoid holding the lock for a significant amount of time
|
||||||
|
///
|
||||||
|
/// Return a vec of tuple of (valid index, timestamp)
|
||||||
|
/// index is into the passed ids slice to avoid copying hashes
|
||||||
|
pub fn count_valid_ids(&self, ids: &[Hash]) -> Vec<(usize, u64)> {
|
||||||
|
let mut ret = Vec::new();
|
||||||
|
for (i, id) in ids.iter().enumerate() {
|
||||||
|
if let Some(entry) = self.entries.get(id) {
|
||||||
|
if self.tick_height - entry.tick_height < MAX_ENTRY_IDS as u64 {
|
||||||
|
ret.push((i, entry.timestamp));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
pub fn clear(&mut self) {
|
||||||
|
self.entries = HashMap::new();
|
||||||
|
self.tick_height = 0;
|
||||||
|
self.last_id = None;
|
||||||
|
}
|
||||||
|
/// fork for LastIdQueue is a simple clone
|
||||||
|
pub fn fork(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
entries: self.entries.clone(),
|
||||||
|
tick_height: self.tick_height,
|
||||||
|
last_id: self.last_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// merge for entryq is a swap
|
||||||
|
pub fn merge_into_root(&mut self, other: Self) {
|
||||||
|
let (entries, tick_height, last_id) = { (other.entries, other.tick_height, other.last_id) };
|
||||||
|
self.entries = entries;
|
||||||
|
self.tick_height = tick_height;
|
||||||
|
self.last_id = last_id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use bincode::serialize;
|
||||||
|
use solana_sdk::hash::hash;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_count_valid_ids() {
|
||||||
|
let first_id = Hash::default();
|
||||||
|
let mut entry_queue = LastIdQueue::default();
|
||||||
|
entry_queue.register_tick(&first_id);
|
||||||
|
let ids: Vec<_> = (0..MAX_ENTRY_IDS)
|
||||||
|
.map(|i| {
|
||||||
|
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
|
||||||
|
entry_queue.register_tick(&last_id);
|
||||||
|
last_id
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
assert_eq!(entry_queue.count_valid_ids(&[]).len(), 0);
|
||||||
|
assert_eq!(entry_queue.count_valid_ids(&[first_id]).len(), 0);
|
||||||
|
for (i, id) in entry_queue.count_valid_ids(&ids).iter().enumerate() {
|
||||||
|
assert_eq!(id.0, i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_register_tick() {
|
||||||
|
let last_id = Hash::default();
|
||||||
|
let mut entry_queue = LastIdQueue::default();
|
||||||
|
assert!(!entry_queue.check_entry(last_id));
|
||||||
|
entry_queue.register_tick(&last_id);
|
||||||
|
assert!(entry_queue.check_entry(last_id));
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn test_reject_old_last_id() {
|
||||||
|
let last_id = Hash::default();
|
||||||
|
let mut entry_queue = LastIdQueue::default();
|
||||||
|
for i in 0..MAX_ENTRY_IDS {
|
||||||
|
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
|
||||||
|
entry_queue.register_tick(&last_id);
|
||||||
|
}
|
||||||
|
// Assert we're no longer able to use the oldest entry ID.
|
||||||
|
assert!(!entry_queue.check_entry(last_id));
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn test_fork() {
|
||||||
|
let last_id = Hash::default();
|
||||||
|
let mut first = LastIdQueue::default();
|
||||||
|
assert!(!first.check_entry(last_id));
|
||||||
|
first.register_tick(&last_id);
|
||||||
|
let second = first.fork();
|
||||||
|
assert!(second.check_entry(last_id));
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn test_merge() {
|
||||||
|
let last_id = Hash::default();
|
||||||
|
let mut first = LastIdQueue::default();
|
||||||
|
assert!(!first.check_entry(last_id));
|
||||||
|
let mut second = first.fork();
|
||||||
|
second.register_tick(&last_id);
|
||||||
|
first.merge_into_root(second);
|
||||||
|
assert!(first.check_entry(last_id));
|
||||||
|
}
|
||||||
|
}
|
|
@ -41,6 +41,7 @@ pub mod fullnode;
|
||||||
pub mod gen_keys;
|
pub mod gen_keys;
|
||||||
pub mod genesis_block;
|
pub mod genesis_block;
|
||||||
pub mod gossip_service;
|
pub mod gossip_service;
|
||||||
|
pub mod last_id_queue;
|
||||||
pub mod leader_scheduler;
|
pub mod leader_scheduler;
|
||||||
pub mod local_vote_signer_service;
|
pub mod local_vote_signer_service;
|
||||||
pub mod packet;
|
pub mod packet;
|
||||||
|
@ -60,7 +61,7 @@ pub mod runtime;
|
||||||
pub mod service;
|
pub mod service;
|
||||||
pub mod sigverify;
|
pub mod sigverify;
|
||||||
pub mod sigverify_stage;
|
pub mod sigverify_stage;
|
||||||
pub mod status_deque;
|
pub mod status_cache;
|
||||||
pub mod storage_stage;
|
pub mod storage_stage;
|
||||||
pub mod streamer;
|
pub mod streamer;
|
||||||
pub mod test_tx;
|
pub mod test_tx;
|
||||||
|
|
26
src/rpc.rs
26
src/rpc.rs
|
@ -6,7 +6,6 @@ use crate::jsonrpc_core::*;
|
||||||
use crate::jsonrpc_http_server::*;
|
use crate::jsonrpc_http_server::*;
|
||||||
use crate::packet::PACKET_DATA_SIZE;
|
use crate::packet::PACKET_DATA_SIZE;
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::status_deque::Status;
|
|
||||||
use crate::storage_stage::StorageState;
|
use crate::storage_stage::StorageState;
|
||||||
use bincode::{deserialize, serialize};
|
use bincode::{deserialize, serialize};
|
||||||
use bs58;
|
use bs58;
|
||||||
|
@ -230,22 +229,13 @@ impl RpcSol for RpcSolImpl {
|
||||||
RpcSignatureStatus::SignatureNotFound
|
RpcSignatureStatus::SignatureNotFound
|
||||||
} else {
|
} else {
|
||||||
match res.unwrap() {
|
match res.unwrap() {
|
||||||
Status::Reserved => {
|
Ok(_) => RpcSignatureStatus::Confirmed,
|
||||||
// Report SignatureReserved as SignatureNotFound as SignatureReserved is
|
Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse,
|
||||||
// transitory while the bank processes the associated transaction.
|
Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError,
|
||||||
RpcSignatureStatus::SignatureNotFound
|
Err(err) => {
|
||||||
|
trace!("mapping {:?} to GenericFailure", err);
|
||||||
|
RpcSignatureStatus::GenericFailure
|
||||||
}
|
}
|
||||||
Status::Complete(res) => match res {
|
|
||||||
Ok(_) => RpcSignatureStatus::Confirmed,
|
|
||||||
Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse,
|
|
||||||
Err(BankError::ProgramError(_, _)) => {
|
|
||||||
RpcSignatureStatus::ProgramRuntimeError
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
trace!("mapping {:?} to GenericFailure", err);
|
|
||||||
RpcSignatureStatus::GenericFailure
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -294,7 +284,7 @@ impl RpcSol for RpcSolImpl {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.get_signature_status(signature);
|
.get_signature_status(signature);
|
||||||
|
|
||||||
if signature_status == Some(Status::Complete(Ok(()))) {
|
if signature_status == Some(Ok(())) {
|
||||||
info!("airdrop signature ok");
|
info!("airdrop signature ok");
|
||||||
return Ok(bs58::encode(signature).into_string());
|
return Ok(bs58::encode(signature).into_string());
|
||||||
} else if now.elapsed().as_secs() > 5 {
|
} else if now.elapsed().as_secs() > 5 {
|
||||||
|
@ -388,7 +378,7 @@ impl JsonRpcRequestProcessor {
|
||||||
let id = self.bank.last_id();
|
let id = self.bank.last_id();
|
||||||
Ok(bs58::encode(id).into_string())
|
Ok(bs58::encode(id).into_string())
|
||||||
}
|
}
|
||||||
pub fn get_signature_status(&self, signature: Signature) -> Option<Status<bank::Result<()>>> {
|
pub fn get_signature_status(&self, signature: Signature) -> Option<bank::Result<()>> {
|
||||||
self.bank.get_signature_status(&signature)
|
self.bank.get_signature_status(&signature)
|
||||||
}
|
}
|
||||||
fn get_transaction_count(&self) -> Result<u64> {
|
fn get_transaction_count(&self) -> Result<u64> {
|
||||||
|
|
|
@ -10,7 +10,6 @@ use crate::jsonrpc_pubsub::{PubSubHandler, Session, SubscriptionId};
|
||||||
use crate::jsonrpc_ws_server::{RequestContext, Sender, ServerBuilder};
|
use crate::jsonrpc_ws_server::{RequestContext, Sender, ServerBuilder};
|
||||||
use crate::rpc::RpcSignatureStatus;
|
use crate::rpc::RpcSignatureStatus;
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::status_deque::Status;
|
|
||||||
use bs58;
|
use bs58;
|
||||||
use solana_sdk::account::Account;
|
use solana_sdk::account::Account;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
@ -322,7 +321,7 @@ impl RpcSolPubSubImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
match status.unwrap() {
|
match status.unwrap() {
|
||||||
Status::Complete(Ok(_)) => {
|
Ok(_) => {
|
||||||
sink.notify(Ok(RpcSignatureStatus::Confirmed))
|
sink.notify(Ok(RpcSignatureStatus::Confirmed))
|
||||||
.wait()
|
.wait()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
|
@ -0,0 +1,238 @@
|
||||||
|
use crate::bloom::{Bloom, BloomHashIndex};
|
||||||
|
use crate::last_id_queue::MAX_ENTRY_IDS;
|
||||||
|
use hashbrown::HashMap;
|
||||||
|
use solana_sdk::hash::Hash;
|
||||||
|
use solana_sdk::signature::Signature;
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::ops::{Deref, DerefMut};
|
||||||
|
|
||||||
|
type FailureMap<T> = HashMap<Signature, T>;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct StatusCache<T> {
|
||||||
|
/// all signatures seen at this checkpoint
|
||||||
|
signatures: Bloom<Signature>,
|
||||||
|
|
||||||
|
/// failures
|
||||||
|
failures: FailureMap<T>,
|
||||||
|
|
||||||
|
/// Merges are empty unless this is the root checkpoint which cannot be unrolled
|
||||||
|
merges: VecDeque<StatusCache<T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Clone> Default for StatusCache<T> {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new(&Hash::default())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Clone> StatusCache<T> {
|
||||||
|
pub fn new(last_id: &Hash) -> Self {
|
||||||
|
let keys = (0..27).map(|i| last_id.hash_at_index(i)).collect();
|
||||||
|
Self {
|
||||||
|
signatures: Bloom::new(38_340_234, keys),
|
||||||
|
failures: HashMap::new(),
|
||||||
|
merges: VecDeque::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn has_signature_merged(&self, sig: &Signature) -> bool {
|
||||||
|
for c in &self.merges {
|
||||||
|
if c.has_signature(sig) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
/// test if a signature is known
|
||||||
|
pub fn has_signature(&self, sig: &Signature) -> bool {
|
||||||
|
self.signatures.contains(&sig) || self.has_signature_merged(sig)
|
||||||
|
}
|
||||||
|
/// add a signature
|
||||||
|
pub fn add(&mut self, sig: &Signature) {
|
||||||
|
self.signatures.add(&sig)
|
||||||
|
}
|
||||||
|
/// Save an error status for a signature
|
||||||
|
pub fn save_failure_status(&mut self, sig: &Signature, err: T) {
|
||||||
|
assert!(self.has_signature(sig), "sig not found");
|
||||||
|
self.failures.insert(*sig, err);
|
||||||
|
}
|
||||||
|
/// Forget all signatures. Useful for benchmarking.
|
||||||
|
pub fn clear(&mut self) {
|
||||||
|
self.failures.clear();
|
||||||
|
self.signatures.clear();
|
||||||
|
}
|
||||||
|
fn get_signature_status_merged(&self, sig: &Signature) -> Option<Result<(), T>> {
|
||||||
|
for c in &self.merges {
|
||||||
|
if c.has_signature(sig) {
|
||||||
|
return c.get_signature_status(sig);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
pub fn get_signature_status(&self, sig: &Signature) -> Option<Result<(), T>> {
|
||||||
|
if let Some(res) = self.failures.get(sig) {
|
||||||
|
return Some(Err(res.clone()));
|
||||||
|
} else if self.signatures.contains(sig) {
|
||||||
|
return Some(Ok(()));
|
||||||
|
}
|
||||||
|
self.get_signature_status_merged(sig)
|
||||||
|
}
|
||||||
|
/// like accounts, status cache starts with an new data structure for every checkpoint
|
||||||
|
/// so only merge is implemented
|
||||||
|
/// but the merges maintains a history
|
||||||
|
pub fn merge_into_root(&mut self, other: Self) {
|
||||||
|
// merges should be empty for every other checkpoint accept the root
|
||||||
|
// which cannot be rolled back
|
||||||
|
assert!(other.merges.is_empty());
|
||||||
|
self.merges.push_front(other);
|
||||||
|
if self.merges.len() > MAX_ENTRY_IDS {
|
||||||
|
//TODO check if this is the right size ^
|
||||||
|
self.merges.pop_back();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn get_signature_status_all<U>(
|
||||||
|
checkpoints: &[U],
|
||||||
|
signature: &Signature,
|
||||||
|
) -> Option<Result<(), T>>
|
||||||
|
where
|
||||||
|
U: Deref<Target = Self>,
|
||||||
|
{
|
||||||
|
for c in checkpoints {
|
||||||
|
if let Some(status) = c.get_signature_status(signature) {
|
||||||
|
return Some(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
pub fn has_signature_all<U>(checkpoints: &[U], signature: &Signature) -> bool
|
||||||
|
where
|
||||||
|
U: Deref<Target = Self>,
|
||||||
|
{
|
||||||
|
for c in checkpoints {
|
||||||
|
if c.has_signature(signature) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
pub fn clear_all<U>(checkpoints: &mut [U]) -> bool
|
||||||
|
where
|
||||||
|
U: DerefMut<Target = Self>,
|
||||||
|
{
|
||||||
|
for c in checkpoints.iter_mut() {
|
||||||
|
c.clear();
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::bank::BankError;
|
||||||
|
use solana_sdk::hash::hash;
|
||||||
|
|
||||||
|
type BankStatusCache = StatusCache<BankError>;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_has_signature() {
|
||||||
|
let sig = Default::default();
|
||||||
|
let last_id = hash(Hash::default().as_ref());
|
||||||
|
let mut status_cache = BankStatusCache::new(&last_id);
|
||||||
|
assert_eq!(status_cache.has_signature(&sig), false);
|
||||||
|
assert_eq!(status_cache.get_signature_status(&sig), None,);
|
||||||
|
status_cache.add(&sig);
|
||||||
|
assert_eq!(status_cache.has_signature(&sig), true);
|
||||||
|
assert_eq!(status_cache.get_signature_status(&sig), Some(Ok(())),);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_has_signature_checkpoint() {
|
||||||
|
let sig = Default::default();
|
||||||
|
let last_id = hash(Hash::default().as_ref());
|
||||||
|
let mut first = BankStatusCache::new(&last_id);
|
||||||
|
first.add(&sig);
|
||||||
|
assert_eq!(first.get_signature_status(&sig), Some(Ok(())));
|
||||||
|
let last_id = hash(last_id.as_ref());
|
||||||
|
let second = StatusCache::new(&last_id);
|
||||||
|
let checkpoints = [&second, &first];
|
||||||
|
assert_eq!(
|
||||||
|
BankStatusCache::get_signature_status_all(&checkpoints, &sig),
|
||||||
|
Some(Ok(())),
|
||||||
|
);
|
||||||
|
assert!(StatusCache::has_signature_all(&checkpoints, &sig));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_has_signature_merged1() {
|
||||||
|
let sig = Default::default();
|
||||||
|
let last_id = hash(Hash::default().as_ref());
|
||||||
|
let mut first = BankStatusCache::new(&last_id);
|
||||||
|
first.add(&sig);
|
||||||
|
assert_eq!(first.get_signature_status(&sig), Some(Ok(())));
|
||||||
|
let last_id = hash(last_id.as_ref());
|
||||||
|
let second = BankStatusCache::new(&last_id);
|
||||||
|
first.merge_into_root(second);
|
||||||
|
assert_eq!(first.get_signature_status(&sig), Some(Ok(())),);
|
||||||
|
assert!(first.has_signature(&sig));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_has_signature_merged2() {
|
||||||
|
let sig = Default::default();
|
||||||
|
let last_id = hash(Hash::default().as_ref());
|
||||||
|
let mut first = BankStatusCache::new(&last_id);
|
||||||
|
first.add(&sig);
|
||||||
|
assert_eq!(first.get_signature_status(&sig), Some(Ok(())));
|
||||||
|
let last_id = hash(last_id.as_ref());
|
||||||
|
let mut second = BankStatusCache::new(&last_id);
|
||||||
|
second.merge_into_root(first);
|
||||||
|
assert_eq!(second.get_signature_status(&sig), Some(Ok(())),);
|
||||||
|
assert!(second.has_signature(&sig));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_failure_status() {
|
||||||
|
let sig = Default::default();
|
||||||
|
let last_id = hash(Hash::default().as_ref());
|
||||||
|
let mut first = StatusCache::new(&last_id);
|
||||||
|
first.add(&sig);
|
||||||
|
first.save_failure_status(&sig, BankError::DuplicateSignature);
|
||||||
|
assert_eq!(first.has_signature(&sig), true);
|
||||||
|
assert_eq!(
|
||||||
|
first.get_signature_status(&sig),
|
||||||
|
Some(Err(BankError::DuplicateSignature)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_clear_signatures() {
|
||||||
|
let sig = Default::default();
|
||||||
|
let last_id = hash(Hash::default().as_ref());
|
||||||
|
let mut first = StatusCache::new(&last_id);
|
||||||
|
first.add(&sig);
|
||||||
|
assert_eq!(first.has_signature(&sig), true);
|
||||||
|
first.save_failure_status(&sig, BankError::DuplicateSignature);
|
||||||
|
assert_eq!(
|
||||||
|
first.get_signature_status(&sig),
|
||||||
|
Some(Err(BankError::DuplicateSignature)),
|
||||||
|
);
|
||||||
|
first.clear();
|
||||||
|
assert_eq!(first.has_signature(&sig), false);
|
||||||
|
assert_eq!(first.get_signature_status(&sig), None,);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn test_clear_signatures_all() {
|
||||||
|
let sig = Default::default();
|
||||||
|
let last_id = hash(Hash::default().as_ref());
|
||||||
|
let mut first = StatusCache::new(&last_id);
|
||||||
|
first.add(&sig);
|
||||||
|
assert_eq!(first.has_signature(&sig), true);
|
||||||
|
let mut second = StatusCache::new(&last_id);
|
||||||
|
let mut checkpoints = [&mut second, &mut first];
|
||||||
|
BankStatusCache::clear_all(&mut checkpoints);
|
||||||
|
assert_eq!(
|
||||||
|
BankStatusCache::has_signature_all(&checkpoints, &sig),
|
||||||
|
false
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,307 +0,0 @@
|
||||||
use crate::poh_service::NUM_TICKS_PER_SECOND;
|
|
||||||
use hashbrown::HashMap;
|
|
||||||
use solana_sdk::hash::Hash;
|
|
||||||
use solana_sdk::signature::Signature;
|
|
||||||
use solana_sdk::timing::timestamp;
|
|
||||||
use std::result;
|
|
||||||
|
|
||||||
/// The number of most recent `last_id` values that the bank will track the signatures
|
|
||||||
/// of. Once the bank discards a `last_id`, it will reject any transactions that use
|
|
||||||
/// that `last_id` in a transaction. Lowering this value reduces memory consumption,
|
|
||||||
/// but requires clients to update its `last_id` more frequently. Raising the value
|
|
||||||
/// lengthens the time a client must wait to be certain a missing transaction will
|
|
||||||
/// not be processed by the network.
|
|
||||||
pub const MAX_ENTRY_IDS: usize = NUM_TICKS_PER_SECOND * 120;
|
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
|
||||||
pub enum Status<T> {
|
|
||||||
Reserved,
|
|
||||||
Complete(T),
|
|
||||||
}
|
|
||||||
|
|
||||||
type StatusMap<T> = HashMap<Signature, Status<T>>;
|
|
||||||
type StatusEntryMap<T> = HashMap<Hash, StatusEntry<T>>;
|
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
|
||||||
pub enum StatusDequeError {
|
|
||||||
/// The `Signature` has been seen before. This can occur under normal operation
|
|
||||||
/// when a UDP packet is duplicated, as a user error from a client not updating
|
|
||||||
/// its `last_id`, or as a double-spend attack.
|
|
||||||
DuplicateSignature,
|
|
||||||
|
|
||||||
/// The bank has not seen the given `last_id` or the transaction is too old and
|
|
||||||
/// the `last_id` has been discarded.
|
|
||||||
LastIdNotFound,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type Result<T> = result::Result<T, StatusDequeError>;
|
|
||||||
|
|
||||||
/// a record of a tick, from register_tick
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct StatusEntry<T> {
|
|
||||||
/// when the id was registered, according to network time
|
|
||||||
tick_height: u64,
|
|
||||||
|
|
||||||
/// timestamp when this id was registered, used for stats/confirmation
|
|
||||||
timestamp: u64,
|
|
||||||
|
|
||||||
/// a map of signature status, used for duplicate detection
|
|
||||||
statuses: StatusMap<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct StatusDeque<T> {
|
|
||||||
/// A FIFO queue of `last_id` items, where each item is a set of signatures
|
|
||||||
/// that have been processed using that `last_id`. Rejected `last_id`
|
|
||||||
/// values are so old that the `last_id` has been pulled out of the queue.
|
|
||||||
|
|
||||||
/// updated whenever an id is registered, at each tick ;)
|
|
||||||
pub tick_height: u64,
|
|
||||||
|
|
||||||
/// last tick to be registered
|
|
||||||
pub last_id: Option<Hash>,
|
|
||||||
|
|
||||||
/// Mapping of hashes to signature sets along with timestamp and what tick_height
|
|
||||||
/// was when the id was added. The bank uses this data to
|
|
||||||
/// reject transactions with signatures it's seen before and to reject
|
|
||||||
/// transactions that are too old (nth is too small)
|
|
||||||
entries: StatusEntryMap<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Default for StatusDeque<T> {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
tick_height: 0,
|
|
||||||
last_id: None,
|
|
||||||
entries: HashMap::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Clone> StatusDeque<T> {
|
|
||||||
pub fn update_signature_status_with_last_id(
|
|
||||||
&mut self,
|
|
||||||
signature: &Signature,
|
|
||||||
result: &T,
|
|
||||||
last_id: &Hash,
|
|
||||||
) {
|
|
||||||
if let Some(entry) = self.entries.get_mut(last_id) {
|
|
||||||
entry
|
|
||||||
.statuses
|
|
||||||
.insert(*signature, Status::Complete(result.clone()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub fn reserve_signature_with_last_id(
|
|
||||||
&mut self,
|
|
||||||
last_id: &Hash,
|
|
||||||
sig: &Signature,
|
|
||||||
) -> Result<()> {
|
|
||||||
if let Some(entry) = self.entries.get_mut(last_id) {
|
|
||||||
if self.tick_height - entry.tick_height < MAX_ENTRY_IDS as u64 {
|
|
||||||
return Self::reserve_signature(&mut entry.statuses, sig);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(StatusDequeError::LastIdNotFound)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Store the given signature. The bank will reject any transaction with the same signature.
|
|
||||||
fn reserve_signature(statuses: &mut StatusMap<T>, signature: &Signature) -> Result<()> {
|
|
||||||
if let Some(_result) = statuses.get(signature) {
|
|
||||||
return Err(StatusDequeError::DuplicateSignature);
|
|
||||||
}
|
|
||||||
statuses.insert(*signature, Status::Reserved);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Forget all signatures. Useful for benchmarking.
|
|
||||||
pub fn clear_signatures(&mut self) {
|
|
||||||
for entry in &mut self.entries.values_mut() {
|
|
||||||
entry.statuses.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if the age of the entry_id is within the max_age
|
|
||||||
/// return false for any entries with an age equal to or above max_age
|
|
||||||
pub fn check_entry_id_age(&self, entry_id: Hash, max_age: usize) -> bool {
|
|
||||||
let entry = self.entries.get(&entry_id);
|
|
||||||
|
|
||||||
match entry {
|
|
||||||
Some(entry) => self.tick_height - entry.tick_height < max_age as u64,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/// Tell the bank which Entry IDs exist on the ledger. This function
|
|
||||||
/// assumes subsequent calls correspond to later entries, and will boot
|
|
||||||
/// the oldest ones once its internal cache is full. Once boot, the
|
|
||||||
/// bank will reject transactions using that `last_id`.
|
|
||||||
pub fn register_tick(&mut self, last_id: &Hash) {
|
|
||||||
self.tick_height += 1;
|
|
||||||
let tick_height = self.tick_height;
|
|
||||||
|
|
||||||
// this clean up can be deferred until sigs gets larger
|
|
||||||
// because we verify entry.nth every place we check for validity
|
|
||||||
if self.entries.len() >= MAX_ENTRY_IDS as usize {
|
|
||||||
self.entries
|
|
||||||
.retain(|_, entry| tick_height - entry.tick_height <= MAX_ENTRY_IDS as u64);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.entries.insert(
|
|
||||||
*last_id,
|
|
||||||
StatusEntry {
|
|
||||||
tick_height,
|
|
||||||
timestamp: timestamp(),
|
|
||||||
statuses: HashMap::new(),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
self.last_id = Some(*last_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Looks through a list of tick heights and stakes, and finds the latest
|
|
||||||
/// tick that has achieved confirmation
|
|
||||||
pub fn get_confirmation_timestamp(
|
|
||||||
&self,
|
|
||||||
ticks_and_stakes: &mut [(u64, u64)],
|
|
||||||
supermajority_stake: u64,
|
|
||||||
) -> Option<u64> {
|
|
||||||
// Sort by tick height
|
|
||||||
ticks_and_stakes.sort_by(|a, b| b.0.cmp(&a.0));
|
|
||||||
let current_tick_height = self.tick_height;
|
|
||||||
let mut total = 0;
|
|
||||||
for (tick_height, stake) in ticks_and_stakes.iter() {
|
|
||||||
if current_tick_height > *tick_height
|
|
||||||
&& ((current_tick_height - tick_height) as usize) < MAX_ENTRY_IDS
|
|
||||||
{
|
|
||||||
total += stake;
|
|
||||||
if total > supermajority_stake {
|
|
||||||
return self.tick_height_to_timestamp(*tick_height);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Maps a tick height to a timestamp
|
|
||||||
fn tick_height_to_timestamp(&self, tick_height: u64) -> Option<u64> {
|
|
||||||
for entry in self.entries.values() {
|
|
||||||
if entry.tick_height == tick_height {
|
|
||||||
return Some(entry.timestamp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_signature_status(&self, signature: &Signature) -> Option<Status<T>> {
|
|
||||||
for entry in self.entries.values() {
|
|
||||||
if let Some(res) = entry.statuses.get(signature) {
|
|
||||||
return Some(res.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
pub fn has_signature(&self, signature: &Signature) -> bool {
|
|
||||||
self.get_signature_status(signature).is_some()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_signature(&self, last_id: &Hash, signature: &Signature) -> Option<Status<T>> {
|
|
||||||
self.entries
|
|
||||||
.get(last_id)
|
|
||||||
.and_then(|entry| entry.statuses.get(signature).cloned())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use bincode::serialize;
|
|
||||||
use solana_sdk::hash::hash;
|
|
||||||
#[test]
|
|
||||||
fn test_duplicate_transaction_signature() {
|
|
||||||
let sig = Default::default();
|
|
||||||
let last_id = Default::default();
|
|
||||||
let mut status_deque: StatusDeque<()> = StatusDeque::default();
|
|
||||||
status_deque.register_tick(&last_id);
|
|
||||||
assert_eq!(
|
|
||||||
status_deque.reserve_signature_with_last_id(&last_id, &sig),
|
|
||||||
Ok(())
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
status_deque.reserve_signature_with_last_id(&last_id, &sig),
|
|
||||||
Err(StatusDequeError::DuplicateSignature)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_clear_signatures() {
|
|
||||||
let signature = Signature::default();
|
|
||||||
let last_id = Default::default();
|
|
||||||
let mut status_deque: StatusDeque<()> = StatusDeque::default();
|
|
||||||
status_deque.register_tick(&last_id);
|
|
||||||
status_deque
|
|
||||||
.reserve_signature_with_last_id(&last_id, &signature)
|
|
||||||
.unwrap();
|
|
||||||
status_deque.clear_signatures();
|
|
||||||
assert_eq!(
|
|
||||||
status_deque.reserve_signature_with_last_id(&last_id, &signature),
|
|
||||||
Ok(())
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_get_signature_status() {
|
|
||||||
let signature = Signature::default();
|
|
||||||
let last_id = Default::default();
|
|
||||||
let mut status_deque: StatusDeque<()> = StatusDeque::default();
|
|
||||||
status_deque.register_tick(&last_id);
|
|
||||||
status_deque
|
|
||||||
.reserve_signature_with_last_id(&last_id, &signature)
|
|
||||||
.expect("reserve signature");
|
|
||||||
assert_eq!(
|
|
||||||
status_deque.get_signature_status(&signature),
|
|
||||||
Some(Status::Reserved)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_register_tick() {
|
|
||||||
let signature = Signature::default();
|
|
||||||
let last_id = Default::default();
|
|
||||||
let mut status_deque: StatusDeque<()> = StatusDeque::default();
|
|
||||||
assert_eq!(
|
|
||||||
status_deque.reserve_signature_with_last_id(&last_id, &signature),
|
|
||||||
Err(StatusDequeError::LastIdNotFound)
|
|
||||||
);
|
|
||||||
status_deque.register_tick(&last_id);
|
|
||||||
assert_eq!(
|
|
||||||
status_deque.reserve_signature_with_last_id(&last_id, &signature),
|
|
||||||
Ok(())
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_has_signature() {
|
|
||||||
let signature = Signature::default();
|
|
||||||
let last_id = Default::default();
|
|
||||||
let mut status_deque: StatusDeque<()> = StatusDeque::default();
|
|
||||||
status_deque.register_tick(&last_id);
|
|
||||||
status_deque
|
|
||||||
.reserve_signature_with_last_id(&last_id, &signature)
|
|
||||||
.expect("reserve signature");
|
|
||||||
assert!(status_deque.has_signature(&signature));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_reject_old_last_id() {
|
|
||||||
let signature = Signature::default();
|
|
||||||
let last_id = Default::default();
|
|
||||||
let mut status_deque: StatusDeque<()> = StatusDeque::default();
|
|
||||||
for i in 0..MAX_ENTRY_IDS {
|
|
||||||
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
|
|
||||||
status_deque.register_tick(&last_id);
|
|
||||||
}
|
|
||||||
// Assert we're no longer able to use the oldest entry ID.
|
|
||||||
assert_eq!(
|
|
||||||
status_deque.reserve_signature_with_last_id(&last_id, &signature),
|
|
||||||
Err(StatusDequeError::LastIdNotFound)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -3,7 +3,6 @@ use solana_native_loader;
|
||||||
|
|
||||||
use solana::bank::Bank;
|
use solana::bank::Bank;
|
||||||
use solana::genesis_block::GenesisBlock;
|
use solana::genesis_block::GenesisBlock;
|
||||||
use solana::status_deque::Status;
|
|
||||||
#[cfg(feature = "bpf_c")]
|
#[cfg(feature = "bpf_c")]
|
||||||
use solana_sdk::bpf_loader;
|
use solana_sdk::bpf_loader;
|
||||||
use solana_sdk::loader_transaction::LoaderTransaction;
|
use solana_sdk::loader_transaction::LoaderTransaction;
|
||||||
|
@ -39,10 +38,7 @@ fn create_bpf_path(name: &str) -> PathBuf {
|
||||||
fn check_tx_results(bank: &Bank, tx: &Transaction, result: Vec<solana::bank::Result<()>>) {
|
fn check_tx_results(bank: &Bank, tx: &Transaction, result: Vec<solana::bank::Result<()>>) {
|
||||||
assert_eq!(result.len(), 1);
|
assert_eq!(result.len(), 1);
|
||||||
assert_eq!(result[0], Ok(()));
|
assert_eq!(result[0], Ok(()));
|
||||||
assert_eq!(
|
assert_eq!(bank.get_signature_status(&tx.signatures[0]), Some(Ok(())));
|
||||||
bank.get_signature(&tx.last_id, &tx.signatures[0]),
|
|
||||||
Some(Status::Complete(Ok(())))
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Loader {
|
struct Loader {
|
||||||
|
|
Loading…
Reference in New Issue