Add checkpoint, rollback to to bank (#1662)

add linked-list capability to accounts

change accounts from a linked list to a VecDeque

add checkpoint and rollback for lastids

add subscriber notifications for rollbacks

checkpoint transaction count, too
This commit is contained in:
Rob Walker 2018-11-05 09:47:41 -08:00 committed by GitHub
parent 5a85cc4626
commit 1fbf1d2cf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 385 additions and 191 deletions

View File

@ -44,7 +44,7 @@ fn bench_process_transaction(bencher: &mut Bencher) {
let mut id = bank.last_id(); let mut id = bank.last_id();
for _ in 0..(MAX_ENTRY_IDS - 1) { for _ in 0..(MAX_ENTRY_IDS - 1) {
bank.register_entry_id(&id); bank.register_tick(&id);
id = hash(&id.as_ref()) id = hash(&id.as_ref())
} }

View File

@ -14,7 +14,7 @@ use solana::entry::Entry;
use solana::hash::hash; use solana::hash::hash;
use solana::mint::Mint; use solana::mint::Mint;
use solana::packet::to_packets_chunked; use solana::packet::to_packets_chunked;
use solana::signature::{Keypair, KeypairUtil, Signature}; use solana::signature::{KeypairUtil, Signature};
use solana::system_transaction::SystemTransaction; use solana::system_transaction::SystemTransaction;
use solana::transaction::Transaction; use solana::transaction::Transaction;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
@ -110,13 +110,13 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let mut id = mint.last_id(); let mut id = mint.last_id();
for _ in 0..MAX_ENTRY_IDS { for _ in 0..MAX_ENTRY_IDS {
id = hash(&id.as_ref()); id = hash(&id.as_ref());
bank.register_entry_id(&id); bank.register_tick(&id);
} }
bencher.iter(move || { bencher.iter(move || {
// make sure the tx last id is still registered // make sure the tx last id is still registered
if bank.count_valid_ids(&[mint.last_id()]).len() == 0 { if bank.count_valid_ids(&[mint.last_id()]).len() == 0 {
bank.register_entry_id(&mint.last_id()); bank.register_tick(&mint.last_id());
} }
for v in verified.chunks(verified.len() / NUM_THREADS) { for v in verified.chunks(verified.len() / NUM_THREADS) {
verified_sender.send(v.to_vec()).unwrap(); verified_sender.send(v.to_vec()).unwrap();
@ -210,13 +210,13 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
let mut id = mint.last_id(); let mut id = mint.last_id();
for _ in 0..MAX_ENTRY_IDS { for _ in 0..MAX_ENTRY_IDS {
id = hash(&id.as_ref()); id = hash(&id.as_ref());
bank.register_entry_id(&id); bank.register_tick(&id);
} }
bencher.iter(move || { bencher.iter(move || {
// make sure the transactions are still valid // make sure the transactions are still valid
if bank.count_valid_ids(&[mint.last_id()]).len() == 0 { if bank.count_valid_ids(&[mint.last_id()]).len() == 0 {
bank.register_entry_id(&mint.last_id()); bank.register_tick(&mint.last_id());
} }
for v in verified.chunks(verified.len() / NUM_THREADS) { for v in verified.chunks(verified.len() / NUM_THREADS) {
verified_sender.send(v.to_vec()).unwrap(); verified_sender.send(v.to_vec()).unwrap();

View File

@ -27,7 +27,7 @@ use signature::Signature;
use solana_sdk::account::{Account, KeyedAccount}; use solana_sdk::account::{Account, KeyedAccount};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std; use std;
use std::collections::{BTreeMap, HashMap, HashSet}; use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::result; use std::result;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
@ -117,48 +117,177 @@ struct ErrorCounters {
duplicate_signature: usize, duplicate_signature: usize,
} }
pub trait Checkpoint {
/// add a checkpoint to this data at current state
fn checkpoint(&mut self);
/// rollback to previous state, panics if no prior checkpoint
fn rollback(&mut self);
/// cull checkpoints to depth, that is depth of zero means
/// no checkpoints, only current state
fn purge(&mut self, depth: usize);
/// returns the number of checkpoints
fn depth(&self) -> usize;
}
/// a record of a tick, from register_tick
#[derive(Clone)]
pub struct LastIdEntry {
/// when the id was registered, according to network time
tick_height: u64,
/// timestamp when this id was registered, used for stats/finality
timestamp: u64,
/// a map of signature status, used for duplicate detection
signature_status: SignatureStatusMap,
}
pub struct LastIds { pub struct LastIds {
/// A FIFO queue of `last_id` items, where each item is a set of signatures /// A FIFO queue of `last_id` items, where each item is a set of signatures
/// that have been processed using that `last_id`. Rejected `last_id` /// that have been processed using that `last_id`. Rejected `last_id`
/// values are so old that the `last_id` has been pulled out of the queue. /// values are so old that the `last_id` has been pulled out of the queue.
/// updated whenever an id is registered /// updated whenever an id is registered, at each tick ;)
tick_height: u64, tick_height: u64,
/// last id to be registered /// last tick to be registered
last: Option<Hash>, last_id: Option<Hash>,
/// Mapping of hashes to signature sets along with timestamp and what tick_height /// Mapping of hashes to signature sets along with timestamp and what tick_height
/// was when the id was added. The bank uses this data to /// was when the id was added. The bank uses this data to
/// reject transactions with signatures it's seen before and to reject /// reject transactions with signatures it's seen before and to reject
/// transactions that are too old (tick_height is too small) /// transactions that are too old (nth is too small)
sigs: HashMap<Hash, (SignatureStatusMap, u64, u64)>, entries: HashMap<Hash, LastIdEntry>,
checkpoints: VecDeque<(u64, Option<Hash>, HashMap<Hash, LastIdEntry>)>,
} }
impl Default for LastIds { impl Default for LastIds {
fn default() -> Self { fn default() -> Self {
LastIds { LastIds {
tick_height: 0, tick_height: 0,
last: None, last_id: None,
sigs: HashMap::new(), entries: HashMap::new(),
checkpoints: VecDeque::new(),
} }
} }
} }
/// The state of all accounts and contracts after processing its entries. impl Checkpoint for LastIds {
fn checkpoint(&mut self) {
self.checkpoints
.push_front((self.tick_height, self.last_id, self.entries.clone()));
}
fn rollback(&mut self) {
let (tick_height, last_id, entries) = self.checkpoints.pop_front().unwrap();
self.tick_height = tick_height;
self.last_id = last_id;
self.entries = entries;
}
fn purge(&mut self, depth: usize) {
while self.depth() > depth {
self.checkpoints.pop_back().unwrap();
}
}
fn depth(&self) -> usize {
self.checkpoints.len()
}
}
#[derive(Default)]
pub struct Accounts {
// TODO: implement values() or something? take this back to private
// from the voting/leader/finality code
// issue #1701
pub accounts: HashMap<Pubkey, Account>,
/// The number of transactions the bank has processed without error since the
/// start of the ledger.
transaction_count: u64,
/// list of prior states
checkpoints: VecDeque<(HashMap<Pubkey, Account>, u64)>,
}
impl Accounts {
fn load(&self, pubkey: &Pubkey) -> Option<&Account> {
if let Some(account) = self.accounts.get(pubkey) {
return Some(account);
}
for (accounts, _) in &self.checkpoints {
if let Some(account) = accounts.get(pubkey) {
return Some(account);
}
}
None
}
fn store(&mut self, pubkey: &Pubkey, account: &Account) {
// purge if balance is 0 and no checkpoints
if account.tokens == 0 && self.checkpoints.is_empty() {
self.accounts.remove(pubkey);
} else {
self.accounts.insert(pubkey.clone(), account.clone());
}
}
fn increment_transaction_count(&mut self, tx_count: usize) {
self.transaction_count += tx_count as u64;
}
fn transaction_count(&self) -> u64 {
self.transaction_count
}
}
impl Checkpoint for Accounts {
fn checkpoint(&mut self) {
let mut accounts = HashMap::new();
std::mem::swap(&mut self.accounts, &mut accounts);
self.checkpoints
.push_front((accounts, self.transaction_count));
}
fn rollback(&mut self) {
let (accounts, transaction_count) = self.checkpoints.pop_front().unwrap();
self.accounts = accounts;
self.transaction_count = transaction_count;
}
fn purge(&mut self, depth: usize) {
while self.depth() > depth {
let (mut purge, _) = self.checkpoints.pop_back().unwrap();
if let Some((last, _)) = self.checkpoints.back_mut() {
purge.retain(|pubkey, account| !last.contains_key(pubkey) && account.tokens != 0);
last.extend(purge.drain());
continue;
}
purge.retain(|pubkey, account| {
!self.accounts.contains_key(pubkey) && account.tokens != 0
});
self.accounts.extend(purge.drain());
}
}
fn depth(&self) -> usize {
self.checkpoints.len()
}
}
/// Manager for the state of all accounts and contracts after processing its entries.
pub struct Bank { pub struct Bank {
/// A map of account public keys to the balance in that account. /// A map of account public keys to the balance in that account.
pub accounts: RwLock<HashMap<Pubkey, Account>>, pub accounts: RwLock<Accounts>,
/// set of accounts which are currently in the pipeline
account_locks: Mutex<HashSet<Pubkey>>,
/// FIFO queue of `last_id` items /// FIFO queue of `last_id` items
last_ids: RwLock<LastIds>, last_ids: RwLock<LastIds>,
/// The number of transactions the bank has processed without error since the /// set of accounts which are currently in the pipeline
/// start of the ledger. account_locks: Mutex<HashSet<Pubkey>>,
transaction_count: AtomicUsize,
// The latest finality time for the network // The latest finality time for the network
finality_time: AtomicUsize, finality_time: AtomicUsize,
@ -177,10 +306,9 @@ pub struct Bank {
impl Default for Bank { impl Default for Bank {
fn default() -> Self { fn default() -> Self {
Bank { Bank {
accounts: RwLock::new(HashMap::new()), accounts: RwLock::new(Accounts::default()),
account_locks: Mutex::new(HashSet::new()),
last_ids: RwLock::new(LastIds::default()), last_ids: RwLock::new(LastIds::default()),
transaction_count: AtomicUsize::new(0), account_locks: Mutex::new(HashSet::new()),
finality_time: AtomicUsize::new(std::usize::MAX), finality_time: AtomicUsize::new(std::usize::MAX),
account_subscriptions: RwLock::new(HashMap::new()), account_subscriptions: RwLock::new(HashMap::new()),
signature_subscriptions: RwLock::new(HashMap::new()), signature_subscriptions: RwLock::new(HashMap::new()),
@ -202,13 +330,44 @@ impl Bank {
let bank = Self::default(); let bank = Self::default();
for deposit in deposits { for deposit in deposits {
let mut accounts = bank.accounts.write().unwrap(); let mut accounts = bank.accounts.write().unwrap();
let account = accounts.entry(deposit.to).or_insert_with(Account::default);
Self::apply_payment(deposit, account); let mut account = Account::default();
account.tokens += deposit.tokens;
accounts.store(&deposit.to, &account);
} }
bank.add_builtin_programs(); bank.add_builtin_programs();
bank bank
} }
pub fn checkpoint(&self) {
self.accounts.write().unwrap().checkpoint();
self.last_ids.write().unwrap().checkpoint();
}
pub fn rollback(&self) {
let rolled_back_pubkeys: Vec<Pubkey> = self
.accounts
.read()
.unwrap()
.accounts
.keys()
.cloned()
.collect();
self.accounts.write().unwrap().rollback();
rolled_back_pubkeys.iter().for_each(|pubkey| {
if let Some(account) = self.accounts.read().unwrap().load(&pubkey) {
self.check_account_subscriptions(&pubkey, account)
}
});
self.last_ids.write().unwrap().rollback();
}
pub fn checkpoint_depth(&self) -> usize {
self.accounts.read().unwrap().depth()
}
/// Create an Bank with only a Mint. Typically used by unit tests. /// Create an Bank with only a Mint. Typically used by unit tests.
pub fn new(mint: &Mint) -> Self { pub fn new(mint: &Mint) -> Self {
let mint_tokens = if mint.bootstrap_leader_id != Pubkey::default() { let mint_tokens = if mint.bootstrap_leader_id != Pubkey::default() {
@ -231,36 +390,19 @@ impl Bank {
} else { } else {
vec![mint_deposit] vec![mint_deposit]
}; };
let bank = Self::new_from_deposits(&deposits); let bank = Self::new_from_deposits(&deposits);
bank.register_entry_id(&mint.last_id()); bank.register_tick(&mint.last_id());
bank bank
} }
fn add_builtin_programs(&self) { fn add_builtin_programs(&self) {
// Preload Bpf Loader program let mut accounts = self.accounts.write().unwrap();
{
let mut accounts = self.accounts.write().unwrap(); // Preload Bpf Loader account
let mut account = accounts accounts.store(&bpf_loader::id(), &bpf_loader::account());
.entry(bpf_loader::id())
.or_insert_with(Account::default);
bpf_loader::populate_account(&mut account);
}
// Preload Erc20 token program // Preload Erc20 token program
{ accounts.store(&token_program::id(), &token_program::account());
let mut accounts = self.accounts.write().unwrap();
let mut account = accounts
.entry(token_program::id())
.or_insert_with(Account::default);
token_program::populate_account(&mut account);
}
}
/// Commit funds to the given account
fn apply_payment(payment: &Payment, account: &mut Account) {
trace!("apply payments {}", payment.tokens);
account.tokens += payment.tokens;
} }
/// Return the last entry ID registered. /// Return the last entry ID registered.
@ -268,7 +410,7 @@ impl Bank {
self.last_ids self.last_ids
.read() .read()
.unwrap() .unwrap()
.last .last_id
.expect("no last_id has been set") .expect("no last_id has been set")
} }
@ -283,18 +425,18 @@ impl Bank {
/// Forget all signatures. Useful for benchmarking. /// Forget all signatures. Useful for benchmarking.
pub fn clear_signatures(&self) { pub fn clear_signatures(&self) {
for sigs in &mut self.last_ids.write().unwrap().sigs.values_mut() { for entry in &mut self.last_ids.write().unwrap().entries.values_mut() {
sigs.0.clear(); entry.signature_status.clear();
} }
} }
/// Check if the age of the entry_id is within the max_age /// Check if the age of the entry_id is within the max_age
/// return false for any entries with an age equal to or above max_age /// return false for any entries with an age equal to or above max_age
fn check_entry_id_age(last_ids: &LastIds, entry_id: Hash, max_age: usize) -> bool { fn check_entry_id_age(last_ids: &LastIds, entry_id: Hash, max_age: usize) -> bool {
let entry = last_ids.sigs.get(&entry_id); let entry = last_ids.entries.get(&entry_id);
match entry { match entry {
Some(entry) => ((last_ids.tick_height - entry.2) as usize) < max_age, Some(entry) => last_ids.tick_height - entry.tick_height < max_age as u64,
_ => false, _ => false,
} }
} }
@ -304,9 +446,9 @@ impl Bank {
last_id: &Hash, last_id: &Hash,
sig: &Signature, sig: &Signature,
) -> Result<()> { ) -> Result<()> {
if let Some(entry) = last_ids.sigs.get_mut(last_id) { if let Some(entry) = last_ids.entries.get_mut(last_id) {
if ((last_ids.tick_height - entry.2) as usize) < MAX_ENTRY_IDS { if last_ids.tick_height - entry.tick_height < MAX_ENTRY_IDS as u64 {
return Self::reserve_signature(&mut entry.0, sig); return Self::reserve_signature(&mut entry.signature_status, sig);
} }
} }
Err(BankError::LastIdNotFound) Err(BankError::LastIdNotFound)
@ -318,23 +460,14 @@ impl Bank {
Self::reserve_signature_with_last_id(&mut last_ids, last_id, sig) Self::reserve_signature_with_last_id(&mut last_ids, last_id, sig)
} }
fn update_signature_status(
signatures: &mut SignatureStatusMap,
signature: &Signature,
result: &Result<()>,
) {
let entry = signatures.entry(*signature).or_insert(Ok(()));
*entry = result.clone();
}
fn update_signature_status_with_last_id( fn update_signature_status_with_last_id(
last_ids_sigs: &mut HashMap<Hash, (SignatureStatusMap, u64, u64)>, last_ids_sigs: &mut HashMap<Hash, LastIdEntry>,
signature: &Signature, signature: &Signature,
result: &Result<()>, result: &Result<()>,
last_id: &Hash, last_id: &Hash,
) { ) {
if let Some(entry) = last_ids_sigs.get_mut(last_id) { if let Some(entry) = last_ids_sigs.get_mut(last_id) {
Self::update_signature_status(&mut entry.0, signature, result); entry.signature_status.insert(*signature, result.clone());
} }
} }
@ -342,7 +475,7 @@ impl Bank {
let mut last_ids = self.last_ids.write().unwrap(); let mut last_ids = self.last_ids.write().unwrap();
for (i, tx) in txs.iter().enumerate() { for (i, tx) in txs.iter().enumerate() {
Self::update_signature_status_with_last_id( Self::update_signature_status_with_last_id(
&mut last_ids.sigs, &mut last_ids.entries,
&tx.signature, &tx.signature,
&res[i], &res[i],
&tx.last_id, &tx.last_id,
@ -365,9 +498,9 @@ impl Bank {
/// Maps a tick height to a timestamp /// Maps a tick height to a timestamp
fn tick_height_to_timestamp(last_ids: &LastIds, tick_height: u64) -> Option<u64> { fn tick_height_to_timestamp(last_ids: &LastIds, tick_height: u64) -> Option<u64> {
for entry in last_ids.sigs.values() { for entry in last_ids.entries.values() {
if entry.2 == tick_height { if entry.tick_height == tick_height {
return Some(entry.1); return Some(entry.timestamp);
} }
} }
None None
@ -382,9 +515,9 @@ impl Bank {
let last_ids = self.last_ids.read().unwrap(); let last_ids = self.last_ids.read().unwrap();
let mut ret = Vec::new(); let mut ret = Vec::new();
for (i, id) in ids.iter().enumerate() { for (i, id) in ids.iter().enumerate() {
if let Some(entry) = last_ids.sigs.get(id) { if let Some(entry) = last_ids.entries.get(id) {
if ((last_ids.tick_height - entry.2) as usize) < MAX_ENTRY_IDS { if last_ids.tick_height - entry.tick_height < MAX_ENTRY_IDS as u64 {
ret.push((i, entry.1)); ret.push((i, entry.timestamp));
} }
} }
} }
@ -414,45 +547,36 @@ impl Bank {
None None
} }
/// Tell the bank about the genesis Entry IDs.
pub fn register_genesis_entry(&self, last_id: &Hash) {
let mut last_ids = self.last_ids.write().unwrap();
last_ids
.sigs
.insert(*last_id, (HashMap::new(), timestamp(), 0));
last_ids.last = Some(*last_id);
inc_new_counter_info!("bank-register_genesis_entry_id-registered", 1);
}
/// Tell the bank which Entry IDs exist on the ledger. This function /// Tell the bank which Entry IDs exist on the ledger. This function
/// assumes subsequent calls correspond to later entries, and will boot /// assumes subsequent calls correspond to later entries, and will boot
/// the oldest ones once its internal cache is full. Once boot, the /// the oldest ones once its internal cache is full. Once boot, the
/// bank will reject transactions using that `last_id`. /// bank will reject transactions using that `last_id`.
pub fn register_entry_id(&self, last_id: &Hash) { pub fn register_tick(&self, last_id: &Hash) {
let mut last_ids = self.last_ids.write().unwrap(); let mut last_ids = self.last_ids.write().unwrap();
last_ids.tick_height += 1; last_ids.tick_height += 1;
let last_ids_tick_height = last_ids.tick_height; let tick_height = last_ids.tick_height;
// this clean up can be deferred until sigs gets larger // this clean up can be deferred until sigs gets larger
// because we verify entry.tick_height every place we check for validity // because we verify entry.nth every place we check for validity
if last_ids.sigs.len() >= MAX_ENTRY_IDS { if last_ids.entries.len() >= MAX_ENTRY_IDS as usize {
last_ids.sigs.retain(|_, (_, _, tick_height)| { last_ids
((last_ids_tick_height - *tick_height) as usize) < MAX_ENTRY_IDS .entries
}); .retain(|_, entry| tick_height - entry.tick_height <= MAX_ENTRY_IDS as u64);
} }
last_ids.sigs.insert( last_ids.entries.insert(
*last_id, *last_id,
(HashMap::new(), timestamp(), last_ids_tick_height), LastIdEntry {
tick_height,
timestamp: timestamp(),
signature_status: HashMap::new(),
},
); );
last_ids.last = Some(*last_id); last_ids.last_id = Some(*last_id);
inc_new_counter_info!("bank-register_entry_id-registered", 1); inc_new_counter_info!("bank-register_tick-registered", 1);
} }
/// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method. /// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method.
@ -499,16 +623,16 @@ impl Bank {
fn load_account( fn load_account(
&self, &self,
tx: &Transaction, tx: &Transaction,
accounts: &HashMap<Pubkey, Account>, accounts: &Accounts,
last_ids: &mut LastIds, last_ids: &mut LastIds,
max_age: usize, max_age: usize,
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Result<Vec<Account>> { ) -> Result<Vec<Account>> {
// Copy all the accounts // Copy all the accounts
if accounts.get(&tx.account_keys[0]).is_none() { if accounts.load(&tx.account_keys[0]).is_none() {
error_counters.account_not_found += 1; error_counters.account_not_found += 1;
Err(BankError::AccountNotFound) Err(BankError::AccountNotFound)
} else if accounts.get(&tx.account_keys[0]).unwrap().tokens < tx.fee { } else if accounts.load(&tx.account_keys[0]).unwrap().tokens < tx.fee {
error_counters.insufficient_funds += 1; error_counters.insufficient_funds += 1;
Err(BankError::InsufficientFundsForFee) Err(BankError::InsufficientFundsForFee)
} else { } else {
@ -530,7 +654,7 @@ impl Bank {
let mut called_accounts: Vec<Account> = tx let mut called_accounts: Vec<Account> = tx
.account_keys .account_keys
.iter() .iter()
.map(|key| accounts.get(key).cloned().unwrap_or_default()) .map(|key| accounts.load(key).cloned().unwrap_or_default())
.collect(); .collect();
called_accounts[0].tokens -= tx.fee; called_accounts[0].tokens -= tx.fee;
Ok(called_accounts) Ok(called_accounts)
@ -783,13 +907,7 @@ impl Bank {
let tx = &txs[i]; let tx = &txs[i];
let acc = racc.as_ref().unwrap(); let acc = racc.as_ref().unwrap();
for (key, account) in tx.account_keys.iter().zip(acc.iter()) { for (key, account) in tx.account_keys.iter().zip(acc.iter()) {
//purge if 0 accounts.store(key, account);
if account.tokens == 0 {
accounts.remove(&key);
} else {
*accounts.entry(*key).or_insert_with(Account::default) = account.clone();
assert_eq!(accounts.get(key).unwrap().tokens, account.tokens);
}
} }
} }
} }
@ -809,7 +927,8 @@ impl Bank {
// the likelyhood of any single thread getting starved and processing old ids. // the likelyhood of any single thread getting starved and processing old ids.
// TODO: Banking stage threads should be prioritized to complete faster then this queue // TODO: Banking stage threads should be prioritized to complete faster then this queue
// expires. // expires.
let results = self.execute_and_commit_transactions(txs, locked_accounts, MAX_ENTRY_IDS / 2); let results =
self.execute_and_commit_transactions(txs, locked_accounts, MAX_ENTRY_IDS as usize / 2);
let process_time = now.elapsed(); let process_time = now.elapsed();
let now = Instant::now(); let now = Instant::now();
self.record_transactions(txs, &results, poh)?; self.record_transactions(txs, &results, poh)?;
@ -914,8 +1033,11 @@ impl Bank {
inc_new_counter_info!("bank-process_transactions-error_count", err_count); inc_new_counter_info!("bank-process_transactions-error_count", err_count);
} }
self.transaction_count self.accounts
.fetch_add(tx_count, Ordering::Relaxed); .write()
.unwrap()
.increment_transaction_count(tx_count);
inc_new_counter_info!("bank-process_transactions-txs", tx_count); inc_new_counter_info!("bank-process_transactions-txs", tx_count);
if 0 != error_counters.last_id_not_found { if 0 != error_counters.last_id_not_found {
inc_new_counter_info!( inc_new_counter_info!(
@ -958,12 +1080,11 @@ impl Bank {
result?; result?;
} }
} else { } else {
self.register_entry_id(&entry.id); self.register_tick(&entry.id);
let tick_height = self.last_ids.read().unwrap().tick_height;
self.leader_scheduler self.leader_scheduler
.write() .write()
.unwrap() .unwrap()
.update_height(tick_height, self); .update_height(self.tick_height(), self);
} }
Ok(()) Ok(())
@ -1007,7 +1128,7 @@ impl Bank {
if entry.is_tick() { if entry.is_tick() {
// if its a tick, execute the group and register the tick // if its a tick, execute the group and register the tick
self.par_execute_entries(&mt_group)?; self.par_execute_entries(&mt_group)?;
self.register_entry_id(&entry.id); self.register_tick(&entry.id);
mt_group = vec![]; mt_group = vec![];
continue; continue;
} }
@ -1034,11 +1155,6 @@ impl Bank {
/// as we go. /// as we go.
fn process_block(&self, entries: &[Entry]) -> Result<()> { fn process_block(&self, entries: &[Entry]) -> Result<()> {
for entry in entries { for entry in entries {
// TODO: We prepare for implementing voting contract by making the associated
// process_entries functions aware of the vote-tracking structure inside
// the leader scheduler. Next we will extract the vote tracking structure
// out of the leader scheduler, and into the bank, and remove the leader
// scheduler from these banking functions.
self.process_entry(entry)?; self.process_entry(entry)?;
} }
@ -1100,6 +1216,10 @@ impl Bank {
let entry1 = entries let entry1 = entries
.next() .next()
.expect("invalid ledger: need at least 2 entries"); .expect("invalid ledger: need at least 2 entries");
// genesis should conform to PoH
assert!(entry1.verify(&entry0.id));
{ {
// Process the first transaction // Process the first transaction
let tx = &entry1.transactions[0]; let tx = &entry1.transactions[0];
@ -1125,29 +1245,34 @@ impl Bank {
{ {
// 1) Deposit into the mint // 1) Deposit into the mint
let mut accounts = self.accounts.write().unwrap(); let mut accounts = self.accounts.write().unwrap();
{
let account = accounts let mut account = accounts
.entry(tx.account_keys[0]) .load(&tx.account_keys[0])
.or_insert_with(Account::default); .cloned()
account.tokens += mint_deposit - leader_payment; .unwrap_or_default();
trace!( account.tokens += mint_deposit - leader_payment;
"applied genesis payment to mint {:?} => {:?}", accounts.store(&tx.account_keys[0], &account);
mint_deposit - leader_payment, trace!(
account "applied genesis payment {:?} => {:?}",
); mint_deposit - leader_payment,
} account
);
// 2) Transfer tokens to the bootstrap leader. The first two // 2) Transfer tokens to the bootstrap leader. The first two
// account keys will both be the mint (because the mint is the source // account keys will both be the mint (because the mint is the source
// for this trnsaction and the first move instruction is to the the // for this transaction and the first move instruction is to the the
// mint itself), so we look at the third account key to find the first // mint itself), so we look at the third account key to find the first
// leader id. // leader id.
let bootstrap_leader_id = tx.account_keys[2]; let bootstrap_leader_id = tx.account_keys[2];
let account = accounts let mut account = accounts
.entry(bootstrap_leader_id) .load(&bootstrap_leader_id)
.or_insert_with(Account::default); .cloned()
.unwrap_or_default();
account.tokens += leader_payment; account.tokens += leader_payment;
accounts.store(&bootstrap_leader_id, &account);
self.leader_scheduler.write().unwrap().bootstrap_leader = bootstrap_leader_id; self.leader_scheduler.write().unwrap().bootstrap_leader = bootstrap_leader_id;
trace!( trace!(
"applied genesis payment to bootstrap leader {:?} => {:?}", "applied genesis payment to bootstrap leader {:?} => {:?}",
leader_payment, leader_payment,
@ -1155,8 +1280,6 @@ impl Bank {
); );
} }
} }
self.register_genesis_entry(&entry0.id);
self.register_genesis_entry(&entry1.id);
Ok(self.process_ledger_blocks(entry1.id, 2, entries)?) Ok(self.process_ledger_blocks(entry1.id, 2, entries)?)
} }
@ -1203,17 +1326,17 @@ impl Bank {
.accounts .accounts
.read() .read()
.expect("'accounts' read lock in get_balance"); .expect("'accounts' read lock in get_balance");
accounts.get(pubkey).cloned() accounts.load(pubkey).cloned()
} }
pub fn transaction_count(&self) -> usize { pub fn transaction_count(&self) -> u64 {
self.transaction_count.load(Ordering::Relaxed) self.accounts.read().unwrap().transaction_count()
} }
pub fn get_signature_status(&self, signature: &Signature) -> Result<()> { pub fn get_signature_status(&self, signature: &Signature) -> Result<()> {
let last_ids = self.last_ids.read().unwrap(); let last_ids = self.last_ids.read().unwrap();
for (signatures, _, _) in last_ids.sigs.values() { for entry in last_ids.entries.values() {
if let Some(res) = signatures.get(signature) { if let Some(res) = entry.signature_status.get(signature) {
return res.clone(); return res.clone();
} }
} }
@ -1228,18 +1351,22 @@ impl Bank {
self.last_ids self.last_ids
.read() .read()
.unwrap() .unwrap()
.sigs .entries
.get(last_id) .get(last_id)
.and_then(|sigs| sigs.0.get(signature).cloned()) .and_then(|entry| entry.signature_status.get(signature).cloned())
} }
/// Hash the `accounts` HashMap. This represents a validator's interpretation /// Hash the `accounts` HashMap. This represents a validator's interpretation
/// of the ledger up to the `last_id`, to be sent back to the leader when voting. /// of the delta of the ledger since the last vote and up to now
pub fn hash_internal_state(&self) -> Hash { pub fn hash_internal_state(&self) -> Hash {
let mut ordered_accounts = BTreeMap::new(); let mut ordered_accounts = BTreeMap::new();
for (pubkey, account) in self.accounts.read().unwrap().iter() {
// only hash internal state of the part being voted upon, i.e. since last
// checkpoint
for (pubkey, account) in &self.accounts.read().unwrap().accounts {
ordered_accounts.insert(*pubkey, account.clone()); ordered_accounts.insert(*pubkey, account.clone());
} }
hash(&serialize(&ordered_accounts).unwrap()) hash(&serialize(&ordered_accounts).unwrap())
} }
@ -1283,11 +1410,10 @@ impl Bank {
pub fn get_current_leader(&self) -> Option<Pubkey> { pub fn get_current_leader(&self) -> Option<Pubkey> {
let ls_lock = self.leader_scheduler.read().unwrap(); let ls_lock = self.leader_scheduler.read().unwrap();
let tick_height = self.last_ids.read().unwrap().tick_height; ls_lock.get_scheduled_leader(self.tick_height())
ls_lock.get_scheduled_leader(tick_height)
} }
pub fn get_tick_height(&self) -> u64 { pub fn tick_height(&self) -> u64 {
self.last_ids.read().unwrap().tick_height self.last_ids.read().unwrap().tick_height
} }
@ -1632,7 +1758,7 @@ mod tests {
let signature = Signature::default(); let signature = Signature::default();
for i in 0..MAX_ENTRY_IDS { for i in 0..MAX_ENTRY_IDS {
let last_id = hash(&serialize(&i).unwrap()); // Unique hash let last_id = hash(&serialize(&i).unwrap()); // Unique hash
bank.register_entry_id(&last_id); bank.register_tick(&last_id);
} }
// Assert we're no longer able to use the oldest entry ID. // Assert we're no longer able to use the oldest entry ID.
assert_eq!( assert_eq!(
@ -1648,7 +1774,7 @@ mod tests {
let ids: Vec<_> = (0..MAX_ENTRY_IDS) let ids: Vec<_> = (0..MAX_ENTRY_IDS)
.map(|i| { .map(|i| {
let last_id = hash(&serialize(&i).unwrap()); // Unique hash let last_id = hash(&serialize(&i).unwrap()); // Unique hash
bank.register_entry_id(&last_id); bank.register_tick(&last_id);
last_id last_id
}).collect(); }).collect();
assert_eq!(bank.count_valid_ids(&[]).len(), 0); assert_eq!(bank.count_valid_ids(&[]).len(), 0);
@ -1788,8 +1914,8 @@ mod tests {
let bank = Bank::default(); let bank = Bank::default();
let (ledger_height, last_id) = bank.process_ledger(ledger).unwrap(); let (ledger_height, last_id) = bank.process_ledger(ledger).unwrap();
assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, 4); assert_eq!(ledger_height, 5);
assert_eq!(bank.get_tick_height(), 1); assert_eq!(bank.tick_height(), 2);
assert_eq!(bank.last_id(), last_id); assert_eq!(bank.last_id(), last_id);
} }
@ -2167,4 +2293,67 @@ mod tests {
]; ];
assert!(ids.into_iter().all(move |id| unique.insert(id))); assert!(ids.into_iter().all(move |id| unique.insert(id)));
} }
#[test]
fn test_checkpoint_rollback() {
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let bob = Keypair::new();
// bob should have 500
bank.transfer(500, &alice.keypair(), bob.pubkey(), alice.last_id())
.unwrap();
assert_eq!(bank.get_balance(&bob.pubkey()), 500);
assert_eq!(bank.checkpoint_depth(), 0);
bank.checkpoint();
bank.checkpoint();
assert_eq!(bank.checkpoint_depth(), 2);
assert_eq!(bank.get_balance(&bob.pubkey()), 500);
assert_eq!(bank.transaction_count(), 1);
// transfer money back, so bob has zero
bank.transfer(500, &bob, alice.keypair().pubkey(), alice.last_id())
.unwrap();
// this has to be stored as zero in the top accounts hashmap ;)
assert_eq!(bank.get_balance(&bob.pubkey()), 0);
assert_eq!(bank.transaction_count(), 2);
bank.rollback();
// bob should have 500 again
assert_eq!(bank.get_balance(&bob.pubkey()), 500);
assert_eq!(bank.transaction_count(), 1);
assert_eq!(bank.checkpoint_depth(), 1);
let signature = Signature::default();
for i in 0..MAX_ENTRY_IDS + 1 {
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
bank.register_tick(&last_id);
}
assert_eq!(bank.tick_height(), MAX_ENTRY_IDS as u64 + 2);
assert_eq!(
bank.reserve_signature_with_last_id_test(&signature, &alice.last_id()),
Err(BankError::LastIdNotFound)
);
bank.rollback();
assert_eq!(bank.tick_height(), 1);
assert_eq!(
bank.reserve_signature_with_last_id_test(&signature, &alice.last_id()),
Ok(())
);
bank.checkpoint();
assert_eq!(
bank.reserve_signature_with_last_id_test(&signature, &alice.last_id()),
Err(BankError::DuplicateSignature)
);
}
#[test]
#[should_panic]
fn test_rollback_panic() {
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
bank.rollback();
}
} }

View File

@ -13,10 +13,12 @@ pub fn id() -> Pubkey {
Pubkey::new(&BPF_LOADER_PROGRAM_ID) Pubkey::new(&BPF_LOADER_PROGRAM_ID)
} }
pub fn populate_account(account: &mut Account) { pub fn account() -> Account {
account.tokens = 0; Account {
account.program_id = id(); tokens: 0,
account.userdata = BPF_LOADER_NAME.as_bytes().to_vec(); program_id: id(),
account.executable = true; userdata: BPF_LOADER_NAME.as_bytes().to_vec(),
account.loader_program_id = native_loader::id(); executable: true,
loader_program_id: native_loader::id(),
}
} }

View File

@ -41,6 +41,7 @@ impl ComputeLeaderFinalityService {
// process_transaction(), case VoteInstruction::RegisterAccount), this will be more accurate. // process_transaction(), case VoteInstruction::RegisterAccount), this will be more accurate.
// See github issue 1654. // See github issue 1654.
bank_accounts bank_accounts
.accounts
.values() .values()
.filter_map(|account| { .filter_map(|account| {
// Filter out any accounts that don't belong to the VoteProgram // Filter out any accounts that don't belong to the VoteProgram
@ -156,7 +157,7 @@ pub mod tests {
let ids: Vec<_> = (0..10) let ids: Vec<_> = (0..10)
.map(|i| { .map(|i| {
let last_id = hash(&serialize(&i).unwrap()); // Unique hash let last_id = hash(&serialize(&i).unwrap()); // Unique hash
bank.register_entry_id(&last_id); bank.register_tick(&last_id);
// sleep to get a different timestamp in the bank // sleep to get a different timestamp in the bank
sleep(Duration::from_millis(1)); sleep(Duration::from_millis(1));
last_id last_id

View File

@ -323,7 +323,7 @@ impl Fullnode {
} else { } else {
let max_tick_height = { let max_tick_height = {
let ls_lock = bank.leader_scheduler.read().unwrap(); let ls_lock = bank.leader_scheduler.read().unwrap();
ls_lock.max_height_for_leader(bank.get_tick_height()) ls_lock.max_height_for_leader(bank.tick_height())
}; };
// Start in leader mode. // Start in leader mode.
let (tpu, entry_receiver, tpu_exit) = Tpu::new( let (tpu, entry_receiver, tpu_exit) = Tpu::new(
@ -350,7 +350,7 @@ impl Fullnode {
entry_height, entry_height,
entry_receiver, entry_receiver,
bank.leader_scheduler.clone(), bank.leader_scheduler.clone(),
bank.get_tick_height(), bank.tick_height(),
tpu_exit, tpu_exit,
); );
let leader_state = LeaderServices::new(tpu, broadcast_stage); let leader_state = LeaderServices::new(tpu, broadcast_stage);
@ -450,7 +450,7 @@ impl Fullnode {
// in the active set, then the leader scheduler will pick the same leader again, so // in the active set, then the leader scheduler will pick the same leader again, so
// check for that // check for that
if scheduled_leader == self.keypair.pubkey() { if scheduled_leader == self.keypair.pubkey() {
let tick_height = self.bank.get_tick_height(); let tick_height = self.bank.tick_height();
self.validator_to_leader(tick_height, entry_height, last_entry_id); self.validator_to_leader(tick_height, entry_height, last_entry_id);
Ok(()) Ok(())
} else { } else {
@ -1056,7 +1056,7 @@ mod tests {
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))), Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))),
); );
assert_eq!(bank.get_tick_height(), bootstrap_height); assert_eq!(bank.tick_height(), bootstrap_height);
assert_eq!( assert_eq!(
entry_height, entry_height,
// Only the first genesis entry has num_hashes = 0, every other entry // Only the first genesis entry has num_hashes = 0, every other entry

View File

@ -204,7 +204,7 @@ impl LeaderScheduler {
// Both above cases are calculated by the function: // Both above cases are calculated by the function:
// count_until_next_leader_rotation() + height // count_until_next_leader_rotation() + height
self.count_until_next_leader_rotation(height).expect( self.count_until_next_leader_rotation(height).expect(
"Should return some value when not using default implementation "Should return some value when not using default implementation
of LeaderScheduler", of LeaderScheduler",
) + height ) + height
} else { } else {
@ -286,9 +286,11 @@ impl LeaderScheduler {
let lower_bound = height.saturating_sub(self.active_window_length); let lower_bound = height.saturating_sub(self.active_window_length);
{ {
let bank_accounts = &bank.accounts.read().unwrap(); let accounts = bank.accounts.read().unwrap();
bank_accounts // TODO: iterate through checkpoints, too
accounts
.accounts
.values() .values()
.filter_map(|account| { .filter_map(|account| {
if VoteProgram::check_id(&account.program_id) { if VoteProgram::check_id(&account.program_id) {

View File

@ -97,11 +97,9 @@ impl Mint {
pub fn create_entries(&self) -> Vec<Entry> { pub fn create_entries(&self) -> Vec<Entry> {
let e0 = Entry::new(&self.seed(), 0, vec![]); let e0 = Entry::new(&self.seed(), 0, vec![]);
let e1 = Entry::new(&e0.id, 1, self.create_transaction());
// Create the transactions that give the mint the initial tokens, and gives the first let e2 = Entry::new(&e1.id, 1, vec![]); // include a tick
// leader the initial tokens vec![e0, e1, e2]
let e1 = Entry::new(&self.seed(), 0, self.create_transaction());
vec![e0, e1]
} }
} }

View File

@ -1,5 +1,5 @@
//! The `poh_recorder` module provides an object for synchronizing with Proof of History. //! The `poh_recorder` module provides an object for synchronizing with Proof of History.
//! It synchronizes PoH, bank's register_entry_id and the ledger //! It synchronizes PoH, bank's register_tick and the ledger
//! //!
use bank::Bank; use bank::Bank;
use entry::Entry; use entry::Entry;
@ -86,7 +86,7 @@ impl PohRecorder {
is_virtual: bool, is_virtual: bool,
virtual_tick_entries: Vec<Entry>, virtual_tick_entries: Vec<Entry>,
) -> Self { ) -> Self {
let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, bank.get_tick_height()))); let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, bank.tick_height())));
let virtual_tick_entries = Arc::new(Mutex::new(virtual_tick_entries)); let virtual_tick_entries = Arc::new(Mutex::new(virtual_tick_entries));
PohRecorder { PohRecorder {
poh, poh,
@ -134,7 +134,7 @@ impl PohRecorder {
fn register_and_send_tick(&self, poh: &mut Poh) -> Result<()> { fn register_and_send_tick(&self, poh: &mut Poh) -> Result<()> {
let tick_entry = self.generate_tick_entry(poh); let tick_entry = self.generate_tick_entry(poh);
self.bank.register_entry_id(&tick_entry.id); self.bank.register_tick(&tick_entry.id);
self.sender.send(vec![tick_entry])?; self.sender.send(vec![tick_entry])?;
Ok(()) Ok(())
} }

View File

@ -170,7 +170,7 @@ impl ReplicateStage {
if leader_id == keypair.pubkey() { if leader_id == keypair.pubkey() {
return Some(ReplicateStageReturnType::LeaderRotation( return Some(ReplicateStageReturnType::LeaderRotation(
bank.get_tick_height(), bank.tick_height(),
entry_height_, entry_height_,
// We should never start the TPU / this stage on an exact entry that causes leader // We should never start the TPU / this stage on an exact entry that causes leader
// rotation (Fullnode should automatically transition on startup if it detects // rotation (Fullnode should automatically transition on startup if it detects

View File

@ -13,10 +13,12 @@ pub fn id() -> Pubkey {
Pubkey::new(&ERC20_PROGRAM_ID) Pubkey::new(&ERC20_PROGRAM_ID)
} }
pub fn populate_account(account: &mut Account) { pub fn account() -> Account {
account.tokens = 0; Account {
account.program_id = id(); tokens: 0,
account.userdata = ERC20_NAME.as_bytes().to_vec(); program_id: id(),
account.executable = true; userdata: ERC20_NAME.as_bytes().to_vec(),
account.loader_program_id = native_loader::id(); executable: true,
loader_program_id: native_loader::id(),
}
} }

View File

@ -108,7 +108,7 @@ impl Tvu {
let (retransmit_stage, blob_window_receiver) = RetransmitStage::new( let (retransmit_stage, blob_window_receiver) = RetransmitStage::new(
&cluster_info, &cluster_info,
window, window,
bank.get_tick_height(), bank.tick_height(),
entry_height, entry_height,
Arc::new(retransmit_socket), Arc::new(retransmit_socket),
repair_socket, repair_socket,
@ -311,7 +311,7 @@ pub mod tests {
let bob_keypair = Keypair::new(); let bob_keypair = Keypair::new();
for i in 0..num_transfers { for i in 0..num_transfers {
let entry0 = Entry::new(&cur_hash, i, vec![]); let entry0 = Entry::new(&cur_hash, i, vec![]);
bank.register_entry_id(&cur_hash); bank.register_tick(&cur_hash);
cur_hash = hash(&cur_hash.as_ref()); cur_hash = hash(&cur_hash.as_ref());
let tx0 = Transaction::system_new( let tx0 = Transaction::system_new(
@ -320,10 +320,10 @@ pub mod tests {
transfer_amount, transfer_amount,
cur_hash, cur_hash,
); );
bank.register_entry_id(&cur_hash); bank.register_tick(&cur_hash);
cur_hash = hash(&cur_hash.as_ref()); cur_hash = hash(&cur_hash.as_ref());
let entry1 = Entry::new(&cur_hash, i + num_transfers, vec![tx0]); let entry1 = Entry::new(&cur_hash, i + num_transfers, vec![tx0]);
bank.register_entry_id(&cur_hash); bank.register_tick(&cur_hash);
cur_hash = hash(&cur_hash.as_ref()); cur_hash = hash(&cur_hash.as_ref());
alice_ref_balance -= transfer_amount; alice_ref_balance -= transfer_amount;

View File

@ -32,7 +32,7 @@ pub fn create_new_signed_vote_blob(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
) -> Result<SharedBlob> { ) -> Result<SharedBlob> {
let shared_blob = SharedBlob::default(); let shared_blob = SharedBlob::default();
let tick_height = bank.get_tick_height(); let tick_height = bank.tick_height();
let leader_tpu = get_leader_tpu(bank, cluster_info)?; let leader_tpu = get_leader_tpu(bank, cluster_info)?;
//TODO: doesn't seem like there is a synchronous call to get height and id //TODO: doesn't seem like there is a synchronous call to get height and id

View File

@ -889,7 +889,7 @@ fn test_leader_to_validator_transition() {
Arc::new(RwLock::new(LeaderScheduler::default())), Arc::new(RwLock::new(LeaderScheduler::default())),
); );
assert_eq!(bank.get_tick_height(), bootstrap_height); assert_eq!(bank.tick_height(), bootstrap_height);
// Shut down // Shut down
ncp.close().unwrap(); ncp.close().unwrap();