diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 931607183..3fdfd39f2 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -160,20 +160,6 @@ impl Blocktree { self.orphans_cf.get(slot) } - pub fn reset_slot_consumed(&self, slot: u64) -> Result<()> { - if let Some(mut meta) = self.meta_cf.get(slot)? { - for index in 0..meta.received { - self.data_cf.delete((slot, index))?; - } - meta.consumed = 0; - meta.received = 0; - meta.last_index = std::u64::MAX; - meta.next_slots = vec![]; - self.meta_cf.put(0, &meta)?; - } - Ok(()) - } - pub fn get_next_slot(&self, slot: u64) -> Result> { let mut db_iterator = self.db.cursor::()?; db_iterator.seek(slot + 1); @@ -771,8 +757,22 @@ impl Blocktree { Ok(entries) } - pub fn set_root(&self, root: u64) { - *self.root_slot.write().unwrap() = root; + pub fn is_root(&self, slot: u64) -> bool { + if let Ok(Some(meta)) = self.meta(slot) { + meta.is_root + } else { + false + } + } + + pub fn set_root(&self, slot: u64) -> Result<()> { + *self.root_slot.write().unwrap() = slot; + + if let Some(mut meta) = self.meta_cf.get(slot)? { + meta.is_root = true; + self.meta_cf.put(slot, &meta)?; + } + Ok(()) } pub fn get_orphans(&self, max: Option) -> Vec { @@ -1481,35 +1481,6 @@ pub mod tests { Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); } - #[test] - fn test_overwrite_entries() { - solana_logger::setup(); - let ledger_path = get_tmp_ledger_path!(); - - let ticks_per_slot = 10; - let num_ticks = 2; - let mut ticks = create_ticks(num_ticks * 2, Hash::default()); - let ticks2 = ticks.split_off(num_ticks as usize); - assert_eq!(ticks.len(), ticks2.len()); - { - let ledger = Blocktree::open(&ledger_path).unwrap(); - - ledger - .write_entries(0, 0, 0, ticks_per_slot, &ticks) - .unwrap(); - ledger.reset_slot_consumed(0).unwrap(); - ledger - .write_entries(0, 0, 0, ticks_per_slot, &ticks2) - .unwrap(); - - let ledger_ticks = ledger.get_slot_entries(0, 0, None).unwrap(); - - assert_eq!(ledger_ticks.len(), ticks2.len()); - assert_eq!(ledger_ticks, ticks2); - } - Blocktree::destroy(&ledger_path).unwrap(); - } - #[test] fn test_put_get_simple() { let ledger_path = get_tmp_ledger_path("test_put_get_simple"); diff --git a/core/src/blocktree/meta.rs b/core/src/blocktree/meta.rs index 02c454816..824d468c9 100644 --- a/core/src/blocktree/meta.rs +++ b/core/src/blocktree/meta.rs @@ -24,6 +24,8 @@ pub struct SlotMeta { // True if this slot is full (consumed == last_index + 1) and if every // slot that is a parent of this slot is also connected. pub is_connected: bool, + // True if this slot is a root + pub is_root: bool, } impl SlotMeta { @@ -51,6 +53,7 @@ impl SlotMeta { parent_slot, next_slots: vec![], is_connected: slot == 0, + is_root: false, last_index: std::u64::MAX, } } diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index 2fff61cf0..7a1200e06 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -185,8 +185,11 @@ pub fn process_blocktree( entry_height += entries.len() as u64; } - // TODO merge with locktower, voting, bank.vote_accounts()... - bank.squash(); + bank.freeze(); // all banks handled by this routine are created from complete slots + + if blocktree.is_root(slot) { + bank.squash(); + } if meta.next_slots.is_empty() { // Reached the end of this fork. Record the final entry height and last entry.hash @@ -354,7 +357,7 @@ mod tests { slot 0 | - slot 1 + slot 1 <-- set_root(true) / \ slot 2 | / | @@ -381,6 +384,9 @@ mod tests { info!("last_fork1_entry.hash: {:?}", last_fork1_entry_hash); info!("last_fork2_entry.hash: {:?}", last_fork2_entry_hash); + blocktree.set_root(0).unwrap(); + blocktree.set_root(1).unwrap(); + let (bank_forks, bank_forks_info) = process_blocktree(&genesis_block, &blocktree, None).unwrap(); @@ -392,6 +398,14 @@ mod tests { entry_height: ticks_per_slot * 4, } ); + assert_eq!( + &bank_forks[3] + .parents() + .iter() + .map(|bank| bank.slot()) + .collect::>(), + &[2, 1] + ); assert_eq!( bank_forks_info[1], BankForksInfo { @@ -399,9 +413,16 @@ mod tests { entry_height: ticks_per_slot * 3, } ); + assert_eq!( + &bank_forks[4] + .parents() + .iter() + .map(|bank| bank.slot()) + .collect::>(), + &[1] + ); - // Ensure bank_forks holds the right banks, and that everything's - // frozen + // Ensure bank_forks holds the right banks for info in bank_forks_info { assert_eq!(bank_forks[info.bank_slot].slot(), info.bank_slot); assert!(bank_forks[info.bank_slot].is_frozen()); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 221e3f403..3a8a21276 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -9,7 +9,7 @@ use crate::leader_schedule_utils; use crate::locktower::{Locktower, StakeLockout}; use crate::packet::BlobError; use crate::poh_recorder::PohRecorder; -use crate::result; +use crate::result::{Error, Result}; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use hashbrown::HashMap; @@ -50,7 +50,7 @@ impl Drop for Finalizer { } pub struct ReplayStage { - t_replay: JoinHandle>, + t_replay: JoinHandle>, } #[derive(Default)] @@ -98,7 +98,9 @@ impl ReplayStage { let mut ticks_per_slot = 0; let mut locktower = Locktower::new_from_forks(&bank_forks.read().unwrap(), &my_id); if let Some(root) = locktower.root() { - blocktree.set_root(root); + blocktree + .set_root(root) + .expect("blocktree.set_root() failed at replay_stage startup"); } // Start the replay stage loop let t_replay = Builder::new() @@ -148,7 +150,7 @@ impl ReplayStage { &vote_account, &cluster_info, &blocktree, - ); + )?; Self::reset_poh_recorder( &my_id, @@ -271,7 +273,7 @@ impl ReplayStage { blocktree: &Blocktree, progress: &mut HashMap, forward_entry_sender: &EntrySender, - ) -> result::Result<()> { + ) -> Result<()> { let (entries, num) = Self::load_blocktree_entries(bank, blocktree, progress)?; let len = entries.len(); let result = @@ -296,12 +298,13 @@ impl ReplayStage { vote_account_pubkey: &Pubkey, cluster_info: &Arc>, blocktree: &Arc, - ) where + ) -> Result<()> + where T: 'static + KeypairUtil + Send + Sync, { if let Some(new_root) = locktower.record_vote(bank.slot()) { bank_forks.write().unwrap().set_root(new_root); - blocktree.set_root(new_root); + blocktree.set_root(new_root)?; Self::handle_new_root(&bank_forks, progress); } locktower.update_epoch(&bank); @@ -315,6 +318,7 @@ impl ReplayStage { ); cluster_info.write().unwrap().push_vote(vote_tx); } + Ok(()) } fn reset_poh_recorder( @@ -349,7 +353,7 @@ impl ReplayStage { progress: &mut HashMap, forward_entry_sender: &EntrySender, slot_full_sender: &Sender<(u64, Pubkey)>, - ) -> result::Result<()> { + ) -> Result<()> { let active_banks = bank_forks.read().unwrap().active_banks(); trace!("active banks {:?}", active_banks); @@ -484,7 +488,7 @@ impl ReplayStage { bank: &Bank, blocktree: &Blocktree, progress: &mut HashMap, - ) -> result::Result<(Vec, usize)> { + ) -> Result<(Vec, usize)> { let bank_slot = bank.slot(); let bank_progress = &mut progress .entry(bank_slot) @@ -498,7 +502,7 @@ impl ReplayStage { progress: &mut HashMap, forward_entry_sender: &EntrySender, num: usize, - ) -> result::Result<()> { + ) -> Result<()> { let bank_progress = &mut progress .entry(bank.slot()) .or_insert(ForkProgress::new(bank.last_blockhash())); @@ -517,7 +521,7 @@ impl ReplayStage { bank: &Bank, entries: &[Entry], last_entry: &Hash, - ) -> result::Result<()> { + ) -> Result<()> { if !entries.verify(last_entry) { trace!( "entry verification failed {} {} {} {}", @@ -526,7 +530,7 @@ impl ReplayStage { last_entry, bank.last_blockhash() ); - return Err(result::Error::BlobError(BlobError::VerificationFailed)); + return Err(Error::BlobError(BlobError::VerificationFailed)); } blocktree_processor::process_entries(bank, entries)?;