From 3d70afc578060ea32353891eb1cc5f28bf2d4610 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 16 Feb 2019 12:00:35 -0700 Subject: [PATCH] Boot leader scheduler from the bank Functional change: the leader scheduler is no longer implicitly updated by PohRecorder via register_tick(). That's intended to be a "feature" (crossing fingers). --- src/bank.rs | 59 ++++++++++++++++++++------------------ src/blocktree_processor.rs | 31 ++++++++++++++++---- src/fullnode.rs | 3 +- src/replay_stage.rs | 2 +- 4 files changed, 59 insertions(+), 36 deletions(-) diff --git a/src/bank.rs b/src/bank.rs index 4c75838b0..2b0eb548d 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -101,20 +101,15 @@ pub struct Bank { /// FIFO queue of `last_id` items last_id_queue: RwLock, - /// Tracks and updates the leader schedule based on the votes and account stakes - /// processed by the bank - leader_scheduler: Option>>, - subscriptions: RwLock>>, } impl Default for Bank { fn default() -> Self { - Bank { + Self { accounts: Accounts::default(), last_id_queue: RwLock::new(LastIdQueue::default()), status_cache: RwLock::new(BankStatusCache::default()), - leader_scheduler: None, subscriptions: RwLock::new(None), } } @@ -132,12 +127,11 @@ impl Bank { genesis_block: &GenesisBlock, leader_scheduler: Arc>, ) -> Self { - let mut bank = Bank::new(genesis_block); + let bank = Bank::new(genesis_block); leader_scheduler .write() .unwrap() .update_tick_height(0, &bank); - bank.leader_scheduler = Some(leader_scheduler); bank } @@ -153,7 +147,6 @@ impl Bank { accounts: self.accounts.copy_for_tpu(), status_cache: RwLock::new(status_cache), last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().clone()), - leader_scheduler: self.leader_scheduler.clone(), subscriptions: RwLock::new(None), } } @@ -319,18 +312,9 @@ impl Bank { /// the oldest ones once its internal cache is full. Once boot, the /// bank will reject transactions using that `last_id`. pub fn register_tick(&self, last_id: &Hash) { - let current_tick_height = { - let mut last_id_queue = self.last_id_queue.write().unwrap(); - inc_new_counter_info!("bank-register_tick-registered", 1); - last_id_queue.register_tick(last_id); - last_id_queue.tick_height - }; - if let Some(leader_scheduler) = &self.leader_scheduler { - leader_scheduler - .write() - .unwrap() - .update_tick_height(current_tick_height, self); - } + let mut last_id_queue = self.last_id_queue.write().unwrap(); + inc_new_counter_info!("bank-register_tick-registered", 1); + last_id_queue.register_tick(last_id); } /// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method. @@ -652,8 +636,12 @@ impl Bank { } /// Process an ordered list of entries. - pub fn process_entries(&self, entries: &[Entry]) -> Result<()> { - self.par_process_entries(entries) + pub fn process_entries( + &self, + entries: &[Entry], + leader_scheduler: &Arc>, + ) -> Result<()> { + self.par_process_entries_with_scheduler(entries, leader_scheduler) } pub fn first_err(results: &[Result<()>]) -> Result<()> { @@ -700,8 +688,13 @@ impl Bank { /// process entries in parallel /// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry /// 2. Process the locked group in parallel - /// 3. Register the `Tick` if it's available, goto 1 - pub fn par_process_entries(&self, entries: &[Entry]) -> Result<()> { + /// 3. Register the `Tick` if it's available + /// 4. Update the leader scheduler, goto 1 + fn par_process_entries_with_scheduler( + &self, + entries: &[Entry], + leader_scheduler: &Arc>, + ) -> Result<()> { // accumulator for entries that can be processed in parallel let mut mt_group = vec![]; for entry in entries { @@ -709,6 +702,10 @@ impl Bank { // if its a tick, execute the group and register the tick self.par_execute_entries(&mt_group)?; self.register_tick(&entry.id); + leader_scheduler + .write() + .unwrap() + .update_tick_height(self.tick_height(), self); mt_group = vec![]; continue; } @@ -732,6 +729,12 @@ impl Bank { Ok(()) } + #[cfg(test)] + fn par_process_entries(&self, entries: &[Entry]) -> Result<()> { + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); + self.par_process_entries_with_scheduler(entries, &leader_scheduler) + } + /// Create, sign, and process a Transaction from `keypair` to `to` of /// `n` tokens where `last_id` is the last Entry ID observed by the client. pub fn transfer( @@ -1122,7 +1125,7 @@ mod tests { ); // Now ensure the TX is accepted despite pointing to the ID of an empty entry. - bank.process_entries(&[entry]).unwrap(); + bank.par_process_entries(&[entry]).unwrap(); assert_eq!(bank.process_transaction(&tx), Ok(())); } @@ -1188,11 +1191,11 @@ mod tests { let bank0 = Bank::default(); bank0.add_builtin_programs(); bank0.process_genesis_block(&genesis_block); - bank0.process_entries(&entries0).unwrap(); + bank0.par_process_entries(&entries0).unwrap(); let bank1 = Bank::default(); bank1.add_builtin_programs(); bank1.process_genesis_block(&genesis_block); - bank1.process_entries(&entries1).unwrap(); + bank1.par_process_entries(&entries1).unwrap(); let initial_state = bank0.hash_internal_state(); diff --git a/src/blocktree_processor.rs b/src/blocktree_processor.rs index c62bfa03d..e2344d7d8 100644 --- a/src/blocktree_processor.rs +++ b/src/blocktree_processor.rs @@ -1,16 +1,26 @@ use crate::bank::{Bank, BankError, Result}; use crate::blocktree::Blocktree; use crate::entry::{Entry, EntrySlice}; +use crate::leader_scheduler::LeaderScheduler; use itertools::Itertools; use solana_sdk::hash::Hash; +use std::sync::{Arc, RwLock}; pub const VERIFY_BLOCK_SIZE: usize = 16; /// Process an ordered list of entries, populating a circular buffer "tail" /// as we go. -fn process_block(bank: &Bank, entries: &[Entry]) -> Result<()> { +fn process_block( + bank: &Bank, + entries: &[Entry], + leader_scheduler: &Arc>, +) -> Result<()> { for entry in entries { bank.process_entry(entry)?; + if entry.is_tick() { + let mut leader_scheduler = leader_scheduler.write().unwrap(); + leader_scheduler.update_tick_height(bank.tick_height(), bank); + } } Ok(()) @@ -18,7 +28,11 @@ fn process_block(bank: &Bank, entries: &[Entry]) -> Result<()> { /// Starting from the genesis block, append the provided entries to the ledger verifying them /// along the way. -fn process_ledger(bank: &Bank, entries: I) -> Result<(u64, Hash)> +fn process_ledger( + bank: &Bank, + entries: I, + leader_scheduler: &Arc>, +) -> Result<(u64, Hash)> where I: IntoIterator, { @@ -50,7 +64,7 @@ where return Err(BankError::LedgerVerificationFailed); } - process_block(bank, &block)?; + process_block(bank, &block, leader_scheduler)?; last_entry_id = block.last().unwrap().id; entry_height += block.len() as u64; @@ -58,9 +72,13 @@ where Ok((entry_height, last_entry_id)) } -pub fn process_blocktree(bank: &Bank, blocktree: &Blocktree) -> Result<(u64, Hash)> { +pub fn process_blocktree( + bank: &Bank, + blocktree: &Blocktree, + leader_scheduler: &Arc>, +) -> Result<(u64, Hash)> { let entries = blocktree.read_ledger().expect("opening ledger"); - process_ledger(&bank, entries) + process_ledger(&bank, entries, leader_scheduler) } #[cfg(test)] @@ -133,7 +151,8 @@ mod tests { let bank = Bank::new(&genesis_block); assert_eq!(bank.tick_height(), 0); assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 100); - let (ledger_height, last_id) = process_ledger(&bank, ledger).unwrap(); + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); + let (ledger_height, last_id) = process_ledger(&bank, ledger, &leader_scheduler).unwrap(); assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 100 - 3); assert_eq!(ledger_height, 8); assert_eq!(bank.tick_height(), 1); diff --git a/src/fullnode.rs b/src/fullnode.rs index fce9faf59..41bb14de6 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -471,7 +471,8 @@ pub fn new_bank_from_ledger( let now = Instant::now(); info!("processing ledger..."); let (entry_height, last_entry_id) = - blocktree_processor::process_blocktree(&bank, &blocktree).expect("process_blocktree"); + blocktree_processor::process_blocktree(&bank, &blocktree, leader_scheduler) + .expect("process_blocktree"); info!( "processed {} ledger entries in {}ms, tick_height={}...", entry_height, diff --git a/src/replay_stage.rs b/src/replay_stage.rs index b75a33ab8..b378fc7f5 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -110,7 +110,7 @@ impl ReplayStage { // If we don't process the entry now, the for loop will exit and the entry // will be dropped. if 0 == num_ticks_to_next_vote || (i + 1) == entries.len() { - res = bank.process_entries(&entries[0..=i]); + res = bank.process_entries(&entries[0..=i], leader_scheduler); if res.is_err() { // TODO: This will return early from the first entry that has an erroneous