Boot leader scheduler from the bank

Functional change: the leader scheduler is no longer implicitly
updated by PohRecorder via register_tick(). That's intended to
be a "feature" (crossing fingers).
This commit is contained in:
Greg Fitzgerald 2019-02-16 12:00:35 -07:00
parent b919b3e3b2
commit 3d70afc578
4 changed files with 59 additions and 36 deletions

View File

@ -101,20 +101,15 @@ pub struct Bank {
/// FIFO queue of `last_id` items
last_id_queue: RwLock<LastIdQueue>,
/// Tracks and updates the leader schedule based on the votes and account stakes
/// processed by the bank
leader_scheduler: Option<Arc<RwLock<LeaderScheduler>>>,
subscriptions: RwLock<Option<Arc<RpcSubscriptions>>>,
}
impl Default for Bank {
fn default() -> Self {
Bank {
Self {
accounts: Accounts::default(),
last_id_queue: RwLock::new(LastIdQueue::default()),
status_cache: RwLock::new(BankStatusCache::default()),
leader_scheduler: None,
subscriptions: RwLock::new(None),
}
}
@ -132,12 +127,11 @@ impl Bank {
genesis_block: &GenesisBlock,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> Self {
let mut bank = Bank::new(genesis_block);
let bank = Bank::new(genesis_block);
leader_scheduler
.write()
.unwrap()
.update_tick_height(0, &bank);
bank.leader_scheduler = Some(leader_scheduler);
bank
}
@ -153,7 +147,6 @@ impl Bank {
accounts: self.accounts.copy_for_tpu(),
status_cache: RwLock::new(status_cache),
last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().clone()),
leader_scheduler: self.leader_scheduler.clone(),
subscriptions: RwLock::new(None),
}
}
@ -319,18 +312,9 @@ impl Bank {
/// the oldest ones once its internal cache is full. Once boot, the
/// bank will reject transactions using that `last_id`.
pub fn register_tick(&self, last_id: &Hash) {
let current_tick_height = {
let mut last_id_queue = self.last_id_queue.write().unwrap();
inc_new_counter_info!("bank-register_tick-registered", 1);
last_id_queue.register_tick(last_id);
last_id_queue.tick_height
};
if let Some(leader_scheduler) = &self.leader_scheduler {
leader_scheduler
.write()
.unwrap()
.update_tick_height(current_tick_height, self);
}
let mut last_id_queue = self.last_id_queue.write().unwrap();
inc_new_counter_info!("bank-register_tick-registered", 1);
last_id_queue.register_tick(last_id);
}
/// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method.
@ -652,8 +636,12 @@ impl Bank {
}
/// Process an ordered list of entries.
pub fn process_entries(&self, entries: &[Entry]) -> Result<()> {
self.par_process_entries(entries)
pub fn process_entries(
&self,
entries: &[Entry],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> {
self.par_process_entries_with_scheduler(entries, leader_scheduler)
}
pub fn first_err(results: &[Result<()>]) -> Result<()> {
@ -700,8 +688,13 @@ impl Bank {
/// process entries in parallel
/// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry
/// 2. Process the locked group in parallel
/// 3. Register the `Tick` if it's available, goto 1
pub fn par_process_entries(&self, entries: &[Entry]) -> Result<()> {
/// 3. Register the `Tick` if it's available
/// 4. Update the leader scheduler, goto 1
fn par_process_entries_with_scheduler(
&self,
entries: &[Entry],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> {
// accumulator for entries that can be processed in parallel
let mut mt_group = vec![];
for entry in entries {
@ -709,6 +702,10 @@ impl Bank {
// if its a tick, execute the group and register the tick
self.par_execute_entries(&mt_group)?;
self.register_tick(&entry.id);
leader_scheduler
.write()
.unwrap()
.update_tick_height(self.tick_height(), self);
mt_group = vec![];
continue;
}
@ -732,6 +729,12 @@ impl Bank {
Ok(())
}
#[cfg(test)]
fn par_process_entries(&self, entries: &[Entry]) -> Result<()> {
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
self.par_process_entries_with_scheduler(entries, &leader_scheduler)
}
/// Create, sign, and process a Transaction from `keypair` to `to` of
/// `n` tokens where `last_id` is the last Entry ID observed by the client.
pub fn transfer(
@ -1122,7 +1125,7 @@ mod tests {
);
// Now ensure the TX is accepted despite pointing to the ID of an empty entry.
bank.process_entries(&[entry]).unwrap();
bank.par_process_entries(&[entry]).unwrap();
assert_eq!(bank.process_transaction(&tx), Ok(()));
}
@ -1188,11 +1191,11 @@ mod tests {
let bank0 = Bank::default();
bank0.add_builtin_programs();
bank0.process_genesis_block(&genesis_block);
bank0.process_entries(&entries0).unwrap();
bank0.par_process_entries(&entries0).unwrap();
let bank1 = Bank::default();
bank1.add_builtin_programs();
bank1.process_genesis_block(&genesis_block);
bank1.process_entries(&entries1).unwrap();
bank1.par_process_entries(&entries1).unwrap();
let initial_state = bank0.hash_internal_state();

View File

@ -1,16 +1,26 @@
use crate::bank::{Bank, BankError, Result};
use crate::blocktree::Blocktree;
use crate::entry::{Entry, EntrySlice};
use crate::leader_scheduler::LeaderScheduler;
use itertools::Itertools;
use solana_sdk::hash::Hash;
use std::sync::{Arc, RwLock};
pub const VERIFY_BLOCK_SIZE: usize = 16;
/// Process an ordered list of entries, populating a circular buffer "tail"
/// as we go.
fn process_block(bank: &Bank, entries: &[Entry]) -> Result<()> {
fn process_block(
bank: &Bank,
entries: &[Entry],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> {
for entry in entries {
bank.process_entry(entry)?;
if entry.is_tick() {
let mut leader_scheduler = leader_scheduler.write().unwrap();
leader_scheduler.update_tick_height(bank.tick_height(), bank);
}
}
Ok(())
@ -18,7 +28,11 @@ fn process_block(bank: &Bank, entries: &[Entry]) -> Result<()> {
/// Starting from the genesis block, append the provided entries to the ledger verifying them
/// along the way.
fn process_ledger<I>(bank: &Bank, entries: I) -> Result<(u64, Hash)>
fn process_ledger<I>(
bank: &Bank,
entries: I,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<(u64, Hash)>
where
I: IntoIterator<Item = Entry>,
{
@ -50,7 +64,7 @@ where
return Err(BankError::LedgerVerificationFailed);
}
process_block(bank, &block)?;
process_block(bank, &block, leader_scheduler)?;
last_entry_id = block.last().unwrap().id;
entry_height += block.len() as u64;
@ -58,9 +72,13 @@ where
Ok((entry_height, last_entry_id))
}
pub fn process_blocktree(bank: &Bank, blocktree: &Blocktree) -> Result<(u64, Hash)> {
pub fn process_blocktree(
bank: &Bank,
blocktree: &Blocktree,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<(u64, Hash)> {
let entries = blocktree.read_ledger().expect("opening ledger");
process_ledger(&bank, entries)
process_ledger(&bank, entries, leader_scheduler)
}
#[cfg(test)]
@ -133,7 +151,8 @@ mod tests {
let bank = Bank::new(&genesis_block);
assert_eq!(bank.tick_height(), 0);
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 100);
let (ledger_height, last_id) = process_ledger(&bank, ledger).unwrap();
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
let (ledger_height, last_id) = process_ledger(&bank, ledger, &leader_scheduler).unwrap();
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 100 - 3);
assert_eq!(ledger_height, 8);
assert_eq!(bank.tick_height(), 1);

View File

@ -471,7 +471,8 @@ pub fn new_bank_from_ledger(
let now = Instant::now();
info!("processing ledger...");
let (entry_height, last_entry_id) =
blocktree_processor::process_blocktree(&bank, &blocktree).expect("process_blocktree");
blocktree_processor::process_blocktree(&bank, &blocktree, leader_scheduler)
.expect("process_blocktree");
info!(
"processed {} ledger entries in {}ms, tick_height={}...",
entry_height,

View File

@ -110,7 +110,7 @@ impl ReplayStage {
// If we don't process the entry now, the for loop will exit and the entry
// will be dropped.
if 0 == num_ticks_to_next_vote || (i + 1) == entries.len() {
res = bank.process_entries(&entries[0..=i]);
res = bank.process_entries(&entries[0..=i], leader_scheduler);
if res.is_err() {
// TODO: This will return early from the first entry that has an erroneous